gpt4 book ai didi

android - RxJava返回错误为onNext并继续流

转载 作者:行者123 更新时间:2023-12-02 13:17:36 27 4
gpt4 key购买 nike

所以我试图使用onErrorReturn返回我想要的结果,但是它将在之后完成流,如何捕获错误返回为Next并仍然继续流?

使用下面的代码,如果出现错误,它将不会到达retryWhen,如果我将其翻转,如果出现错误,它将不会重新订阅retryWhen

fun process(): Observable<State> {
return publishSubject
.flatMap { intent ->
actAsRepo(intent) // Might return error
.map { State(data = it, error = null) }
}
.onErrorReturn { State(data = "", error = it) } // catch the error
.retryWhen { errorObs ->
errorObs.flatMap {
Observable.just(State.defaultState()) // continue subscribing
}
}
}

private fun actAsRepo(string: String): Observable<String> {
if (string.contains('A')) {
throw IllegalArgumentException("Contains A")
} else {
return Observable.just("Wrapped from repo: $string")
}
}


订户将是

viewModel.process().subscribe(this::render)

最佳答案

onError是终端运算符(operator)。如果发生onError,它将在运算符之间传递。您可以使用onError-operator来捕获onError并提供备用。

在您的示例中,onError发生在flatMap的内部流中。 onError将向下游传播到onErrorReturn操作者。如果看一下实现,您将看到将调用onErrorReturn lambda,结果将在onComplete之后通过onNext向下游推送

    @Override
public void onError(Throwable t) {
T v;
try {
v = valueSupplier.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}

if (v == null) {
NullPointerException e = new NullPointerException("The supplied value is null");
e.initCause(t);
downstream.onError(e);
return;
}

downstream.onNext(v); // <--------
downstream.onComplete(); // <--------
}

您的解决方案是什么结果?

由于以下原因,您的流完成了:#retry当JavaDoc

If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner {@code ObservableSource} signals {@code onError} or {@code onComplete} while the upstream is active, the sequence is terminated with the same signal immediately.



您应该做什么:

将onErrorReturn放置在flatMap中的 map 操作者之后。通过这种排序,当inner-flatMap流onErrors时,您的流将无法完成。

为什么是这样?

当外部(源:publishSubject)和内部流(订阅)都完成时,flatMap运算符完成。在这种情况下,外部流(publishSubject)发出onNext,内部流将在通过onNext发送{State(data =“”,error = it)}之后完成。因此,流将保持打开状态。
interface ApiCall {
fun call(s: String): Observable<String>
}

class ApiCallImpl : ApiCall {
override fun call(s: String): Observable<String> {
// important: warp call into observable, that the exception is caught and emitted as onError downstream
return Observable.fromCallable {
if (s.contains('A')) {
throw IllegalArgumentException("Contains A")
} else {
s
}
}
}
}

data class State(val data: String, val err: Throwable? = null)

apiCallImpl.call将返回一个可延迟的可观察对象,这将在订阅时引发错误,而不是在可观察的组装时抛出错误。
// no need for retryWhen here, except you want to catch onComplete from the publishSubject, but once the publishSubject completes no re-subscription will help you, because the publish-subject is terminated and onNext invocations will not be accepted anymore (see implementation). 
fun process(): Observable<State> {
return publishSubject
.flatMap { intent ->
apiCallImpl.call(intent) // Might return error
.map { State(data = it, err = null) }
.onErrorReturn { State("", err = it) }
}
}

测试
lateinit var publishSubject: PublishSubject<String>
lateinit var apiCallImpl: ApiCallImpl

@Before
fun init() {
publishSubject = PublishSubject.create()
apiCallImpl = ApiCallImpl()
}

@Test
fun myTest() {
val test = process().test()

publishSubject.onNext("test")
publishSubject.onNext("A")
publishSubject.onNext("test2")

test.assertNotComplete()
.assertNoErrors()
.assertValueCount(3)
.assertValueAt(0) {
assertThat(it).isEqualTo(State("test", null))
true
}
.assertValueAt(1) {
assertThat(it.data).isEmpty()
assertThat(it.err).isExactlyInstanceOf(IllegalArgumentException::class.java)
true
}
.assertValueAt(2) {
assertThat(it).isEqualTo(State("test2", null))
true
}
}

另类

该替代方案的行为与第一种解决方案略有不同。 flatMap-Operator带有一个 bool(boolean) 值(delayError),这将导致吞入onError消息,直到源完成为止。当源完成时,将发出错误。

当异常没有用并且在出现时一定不能记录时,可以使用delayError true

处理
fun process(): Observable<State> {
return publishSubject
.flatMap({ intent ->
apiCallImpl.call(intent)
.map { State(data = it, err = null) }
}, true)
}

测试

仅发射两个值。该错误不会转换为后备值。
@Test
fun myTest() {
val test = process().test()

publishSubject.onNext("test")
publishSubject.onNext("A")
publishSubject.onNext("test2")

test.assertNotComplete()
.assertNoErrors()
.assertValueAt(0) {
assertThat(it).isEqualTo(State("test", null))
true
}
.assertValueAt(1) {
assertThat(it).isEqualTo(State("test2", null))
true
}
.assertValueCount(2)
}

注意:我认为您想在这种情况下使用switchMap而不是flatMap。

关于android - RxJava返回错误为onNext并继续流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62233881/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com