gpt4 book ai didi

java - 如何使用 RxJava 2 来安排重试竞赛?

转载 作者:行者123 更新时间:2023-11-30 06:39:09 25 4
gpt4 key购买 nike

假设我们有一组不稳定(有时会失败)的解析器,它们可能能够也可能无法处理给定的文件,即给定的解析器要么以一定的概率成功 p > 0,要么总是失败(p=0)。是否可以使用 RxJava 让这组解析器订阅传入文件流并“竞赛”来解析文件?

鉴于解析器最初可能会失败但仍然能够解析文件,因此有必要让它们使用某种退避策略重试。鉴于也有可能没有解析器能够处理给定的文件,因此应该限制重试次数。

使用 retryWhen 实现指数退避相对容易实现,如下所示 ( source ):

source.retryWhen(errors ->
errors.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);

但是,我不知道如何设置并行竞赛。看起来 amb 运算符就是我们想要的,但是将其应用于任意数量的流似乎需要使用 blockingIterable,这(我认为)违背了比赛受阻。我在互联网上找不到与 amb 这个用例相关的任何有用信息。

到目前为止,我的尝试类似于这样:

Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers
int numParsers = parserSet.size();

Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat();

fileSource
.flatMap(f -> parsers.take(numParsers)
.map(p -> p.parse(f))
.retryWhen(/* snippet from above */)
.onErrorReturn(/* some error value */)
).take(1)

Flowable 引入了 .parallel() 运算符,该运算符最近刚刚添加了 ParallelFailureHandling ( see this pr ),它有一个 RETRY 方法,但我似乎无法让可流动对象在其中一个返回后停止重试。

这个问题可以用 RxJava 解决吗?

最佳答案

做出合理的假设,即您的解析器是同步的,例如

Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers
int numParsers = parserSet.size();

ArrayList<Flowable<T>> parserObservableList = new ArrayList<>();

for (Parser p: parserSet) {
parserObservableList.add(Flowable.fromCallable(() -> p.parse(f))
.retryWhen(/* Add your retry logic */)
.onErrorReturn(/* some error value */));
}

Flowable.amb(parserObservableList).subscribe(/* do what you want with the results */);

应该满足您的要求。

关于java - 如何使用 RxJava 2 来安排重试竞赛?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44700985/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com