gpt4 book ai didi

java - 使用RxJava异步调用多个同步任务

转载 作者:行者123 更新时间:2023-11-30 07:34:15 27 4
gpt4 key购买 nike

我有一个由 Futures 表示的异步任务,在一个单独的线程池中执行,我想使用 RxJava 加入该线程池。使用 Java 5 构造的“旧”方法如下(省略收集结果):

final Future<Response> future1 = wsClient.callAsync();
final Future<Response> future2 = wsClient.callAsync();
final Future<Response> future3 = wsClient.callAsync();
final Future<Response> future4 = wsClient.callAsync();

future1.get();
future2.get();
future3.get();
future4.get();

这会阻塞我当前的线程,直到所有 future 都完成,但调用将是并行的,整个操作只会花费与最长调用相同的时间。

我想使用 RxJava 做同样的事情,但在如何正确建模方面我有点菜鸟。

我尝试了以下方法,它似乎有效:

Observable.from(Arrays.asList(1,2,3,4))
.flatMap(n -> Observable.from(wsClient.callAsync(), Schedulers.io()))
.toList()
.toBlocking()
.single();

这种方法的问题是我引入了 Schedulers.io 线程池,这会导致不必要的线程切换,因为我已经阻塞了当前线程(使用 toBlocking())。有什么方法可以对 Rx 流程进行建模以并行执行任务,并阻塞直到所有任务完成?

最佳答案

您应该使用zip功能。例如这样:

Observable.zip(
Observable.from(wsClient.callAsync(), Schedulers.io()),
Observable.from(wsClient.callAsync(), Schedulers.io()),
Observable.from(wsClient.callAsync(), Schedulers.io()),
Observable.from(wsClient.callAsync(), Schedulers.io()),
(response1, response2, response3, response4) -> {
// This is a zipping function...
// You'll end up here when you've got all responses
// Do what you want with them and return a combined result
// ...
return null; //combined result instead of null
})
.subscribe(combinedResult -> {
// Use the combined result
});

Observable.zip 还可以与 Iterable 一起使用,因此您可以包装 Observable.from(wsClient.callAsync(), Schedulers.io()) ; 周围有一个(返回其中的 4)。

关于java - 使用RxJava异步调用多个同步任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35627010/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com