gpt4 book ai didi

java - RxJava 2 - 如何在错误时取消无限流并处理它?

转载 作者:行者123 更新时间:2023-12-02 02:03:38 28 4
gpt4 key购买 nike

我有以下无限流,每秒都会执行一些操作。
我想要的是在出现错误时停止流并处理它。
我怎样才能做到这一点?

void doSomething() {
Disposable disposable = execute(doSomethingInner(), 0L, TimeUnit.SECONDS, schedulerProvider.io(), someClass -> 1L).doOnError
(throwable -> {
Timber.e(throwable, "error happened");// Never triggered
})
.doOnNext(someClass -> Timber.i("doing the infinite stuff"))
.subscribe(Functions.emptyConsumer(), throwable -> {
Timber.e(throwable, "stop doing the infinite stuff");// Never triggered
});
}

Observable<SomeClass> doSomethingInner() {
return Observable.error(new Exception("something went wrong"));
}

Observable<SomeClass> execute(Observable<SomeClass> source,
long delayInterval,
TimeUnit timeUnit,
Scheduler scheduler,
Function<SomeClass, Long> interval) {
return Observable.defer(new Callable<ObservableSource<SomeClass>>() {
long currentInterval = delayInterval;

@Override
public ObservableSource<SomeClass> call() {
return Single.timer(currentInterval, timeUnit, scheduler)
.flatMapObservable(o -> source)
.doOnNext(t -> currentInterval = interval.apply(t));
}
})
.repeat()
.retry();
}

最佳答案

我认为retry()正在消耗你的错误。
尝试以下任一操作:

  • 删除此 retry()完全
  • 或将其更改为 retry(Predicate<Throwable>)决定是否重复。

如果您不提前使用流,订阅者的默认行为是在错误时取消流,并且您应该收到 onError() 的回调。里面subscribe() .

关于java - RxJava 2 - 如何在错误时取消无限流并处理它?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51187121/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com