gpt4 book ai didi

rx-java - rxjava : avoid cancelling subscription on error in downstream operations

转载 作者:行者123 更新时间:2023-12-05 07:39:36 27 4
gpt4 key购买 nike

我是 RxJava 的新手,有一个问题我无法找到合适的解决方案,即使经过数小时的研究。

它是这样的:我正在开发一个银行应用程序,它连接到某个服务器以接收定价更新、处理这些更新并将它们保存在某个内部数据库中。使用 RxJava 2,在没有错误发生的情况下很容易让一些东西工作,但如果在处理阶段出现错误,事情往往会变得难看:订阅被取消,并且不再收到消息。

我曾经使用一个临时的流媒体库,它只捕获异常,记录导致它们的错误和消息,并保持流打开。这是一种非常简单实用的错误处理方式,但它确保了应用程序相当高的健壮性。

使用 RxJava2,我能想到的唯一真正有效的方法是用捕获这些异常的包装器包围我的所有映射运算符。它感觉不是很健壮,因为任何人都可能忘记捕获错误,这会导致应用程序终止。

为了让我的案例更清楚,这是一个当前失败的测试,我想修复它:

@Test
public void shouldKeepProducingWithErrorsInMap() {

List<Integer> output = new ArrayList<>();
List<Throwable> errors = new ArrayList<>();
AtomicInteger startCount = new AtomicInteger(0);

Observable<Integer> source = Observable.defer(() -> {
startCount.incrementAndGet();
return Observable.range(1, 10);
});

source
.map(i -> {
if (i % 3 == 0) {
throw new IllegalStateException("I cannot accept multiples of 3, this is " + i);
}
return i;
})
.subscribe(output::add, errors::add, () -> log.info("done"));

Assertions.assertThat(startCount.get()).isEqualTo(1);
Assertions.assertThat(output).containsExactly(1, 2, 4, 5, 7, 8, 10);
Assertions.assertThat(errors).hasSize(0);

}

为了达到适当的健壮性水平,我想要以下内容:

  1. 通过从一些工厂方法返回一个 Observable 来封装连接到某个任意源的逻辑
  2. 避免在下游某个点发生错误时重新连接到源
  3. 如果出现下游错误,只需记录错误并转到下一条消息
  4. 在工厂方法本身中实现此“源代码保护”功能,以避免在发生意外错误时中断提要。

有没有办法在 RxJava2 中使用可用的运算符来实现这一点,或者我应该为此实现一个自定义运算符?

最佳答案

使用 onErrorReturn 运算符 ( documentation ),它可以让您在上游可观察对象抛出错误时发出一个元素。我会让可观察对象的结果对象也封装它失败的状态,并在失败时发出该对象。类似的东西。

someObservable
.map(response -> SomeResult.success(action.getMessage()))
.onErrorReturn(t -> SomeResult.failure(t.getMessage()))
.observeOn(AndroidSchedulers.mainThread())
// e.g. do something with result, or show toast when it fails.

关于rx-java - rxjava : avoid cancelling subscription on error in downstream operations,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47010594/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com