gpt4 book ai didi

java - RxJava onErrorResumeNext()

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:50:10 28 4
gpt4 key购买 nike

我有两个可观察对象(为简单起见命名为 A 和 B)和一个订阅者。因此,订阅者订阅 A,如果 A 出现错误,则 B(这是后备)启动。现在,每当 A 遇到错误时,B 都会正常调用,但是 A 会调用订阅者的 onComplete(),因此 B 会做出响应即使 B 执行成功,也永远不会到达订阅者。

这是正常行为吗?我认为 onErrorResumeNext() 应该继续流并在完成后通知订阅者,如文档中所述(https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext)。

这是我正在做的事情的整体结构(省略了几个“无聊”的代码):

public Observable<ModelA> observeGetAPI(){
return retrofitAPI.getObservableAPI1()
.flatMap(observableApi1Response -> {
ModelA model = new ModelA();

model.setApi1Response(observableApi1Response);

return retrofitAPI.getObservableAPI2()
.map(observableApi2Response -> {
// Blah blah blah...
return model;
})
.onErrorResumeNext(observeGetAPIFallback(model))
.subscribeOn(Schedulers.newThread())
})
.onErrorReturn(throwable -> {
// Blah blah blah...
return model;
})
.subscribeOn(Schedulers.newThread());
}

private Observable<ModelA> observeGetAPIFallback(ModelA model){
return retrofitAPI.getObservableAPI3().map(observableApi3Response -> {
// Blah blah blah...
return model;
}).onErrorReturn(throwable -> {
// Blah blah blah...
return model;
})
.subscribeOn(Schedulers.immediate());
}

Subscription subscription;
subscription = observeGetAPI.subscribe(ModelA -> {
// IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE...
}, throwable ->{
// WE NEVER GET HERE... onErrorResumeNext()
},
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED }
);

知道我做错了什么吗?

谢谢!

编辑:这是正在发生的事情的粗略时间表:

---> HTTP GET REQUEST B
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS)

---> HTTP GET REQUEST A
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!)

---> HTTP GET FALLBACK A
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time.
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS)

这是我制作的简单图表的链接,它代表了我想要发生的事情: Diagram

最佳答案

下面使用的 Rx 调用应该模拟您使用 Retrofit 所做的事情。

fallbackObservable =
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
logger.v("emitting A Fallback");
subscriber.onNext("A Fallback");
subscriber.onCompleted();
}
})
.delay(1, TimeUnit.SECONDS)
.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
logger.v("emitting Fallback Error");
return "Fallback Error";
}
})
.subscribeOn(Schedulers.immediate());

stringObservable =
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
logger.v("emitting B");
subscriber.onNext("B");
subscriber.onCompleted();
}
})
.delay(1, TimeUnit.SECONDS)
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
logger.v("flatMapping B");
return Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
logger.v("emitting A");
subscriber.onNext("A");
subscriber.onCompleted();
}
})
.delay(1, TimeUnit.SECONDS)
.map(new Func1<String, String>() {
@Override
public String call(String s) {
logger.v("A completes but contains invalid data - throwing error");
throw new NotImplementedException("YUCK!");
}
})
.onErrorResumeNext(fallbackObservable)
.subscribeOn(Schedulers.newThread());
}
})
.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
logger.v("emitting Return Error");
return "Return Error";
}
})
.subscribeOn(Schedulers.newThread());

subscription = stringObservable.subscribe(
new Action1<String>() {
@Override
public void call(String s) {
logger.v("onNext " + s);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
logger.v("onError");
}
},
new Action0() {
@Override
public void call() {
logger.v("onCompleted");
}
});

日志语句的输出是:

RxNewThreadScheduler-1 emitting BRxComputationThreadPool-1 flatMapping BRxNewThreadScheduler-2 emitting ARxComputationThreadPool-2 A completes but contains invalid data - throwing errorRxComputationThreadPool-2 emitting A FallbackRxComputationThreadPool-1 onNext A FallbackRxComputationThreadPool-1 onCompleted

这似乎是您要找的东西,但也许我遗漏了什么。

关于java - RxJava onErrorResumeNext(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25634499/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com