gpt4 book ai didi

android - 如何在错误情况下非阻塞地组合多个 RxJava 链

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:06:38 24 4
gpt4 key购买 nike

我的要求:

  • N 个 Retrofit 并行调用
  • 等待所有调用完成(成功或失败)
  • 如果 k (0<= k < N) 次调用失败,它们不应阻止其他调用。想象一下,失败的调用可以返回 null 或错误对象,而成功的调用返回 ResponseBody 对象。
  • 可选:如果 RxJava 有一种简单的方法来区分每次成功或失败的调用,那很好,如果没有,我将解析响应并自己弄清楚

我有什么:

        Observable<ResponseBody> api1Call = api1.fetchData();
Observable<ResponseBody> api2Call = api2.fetchData();
Observable<ResponseBody> api3Call = api3.fetchData();

Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
@Override
public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
return null;
}
}).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
Logger.e(throwable, "some error with one of the apis?");
return Observable.empty();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Logger.i("onCompleted");
}

@Override
public void onError(Throwable e) {
Logger.e(e, "onError");
}

@Override
public void onNext(Object o) {
Logger.i("onNext " + o);
}
});

我得到的输出:

some error with one of the apis?
// stacktrace of the error
onCompleted

我是 RxJava 的新手,非常困惑。我在 StackOverflow 上找到了一些答案,说 zip 做了类似的事情,但它离我的要求更远。我猜其中一个“组合”运算符 + 适当的异常处理将满足我的需要。到目前为止真的很难弄明白

我使用的版本:

compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'

最佳答案

  1. 您无法通过combineLastzip 实现并行,rxjava 将在我的测试中按顺序执行和发出您的项目。

  2. 如果您的任务之一失败,您的 Func2#call 将不会被调用,而是提交 onError。你甚至无法通过这种方式获得其他成功任务的结果。

  3. 解决方案是flatMap,这是rxjava中实现并发的传统方式。它还满足您的其他要求。

这是一个小而完整的例子。

我使用一个简单的网站服务来测试。

我用一个Semaphore 来等待所有任务完成,你完全可以忽略它。并且我在 http 请求中添加了日志记录以便更好地理解,您也可以完全忽略它。

public interface WebsiteService {

@GET
Observable<ResponseBody> website(@Url String url);

}

然后我用下面的代码用rxjava测试结果。

   HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);

Retrofit retrofit = new Retrofit.Builder().baseUrl("https://www.google.com")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(new OkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
.build();
WebsiteService websiteService = retrofit.create(WebsiteService.class);

final Semaphore s = new Semaphore(1);
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}

Observable<ResponseBody> first = websiteService.website("http://github.com");
Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
Observable<ResponseBody> third = websiteService.website("http://notexisting.com");

final int numberOfCalls = 3; // testing for three calls

Observable.just(first, second, third)
.flatMap(new Function<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
@Override
public ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable) throws Exception {
return responseBodyObservable.subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<ResponseBody>() {

private int currentDoneCalls = 0;

private void checkShouldReleaseSemaphore() {
if (currentDoneCalls >= numberOfCalls) {
s.release();
}
}

@Override
public void onSubscribe(@NonNull Disposable d) {

}

@Override
public void onNext(@NonNull ResponseBody responseBody) {
System.out.println("Retrofit call success " + responseBody.contentType());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}

@Override
public void onError(@NonNull Throwable e) {
System.out.println("Retrofit call failed " + e.getMessage());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}

@Override
public void onComplete() {
System.out.println("onComplete, All request success");
checkShouldReleaseSemaphore();
}

});

try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("All request done");
s.release();
}

我使用 rxjava2 并改造 adapter-rxjava2 进行测试。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.1'

已更新

RxJava2的介绍页来自github指出了实现并行的实用方法。

Practically, paralellism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this...

虽然这个例子是基于RxJava2,但是操作flatMap是 已经存在于 RxJava 中。

关于android - 如何在错误情况下非阻塞地组合多个 RxJava 链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44643106/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com