gpt4 book ai didi

java - 将未知数量的并行子任务合并为一个 Completable

转载 作者:行者123 更新时间:2023-12-01 20:54:59 26 4
gpt4 key购买 nike

我需要创建并发网络请求。根据这些请求的结果,可能会启动更多请求。

我希望获得一个可完成的任务,该任务可以在所有请求完成后完成,并且无需创建进一步的请求。

我的问题是,是否可以使用以下代码 fragment 来实现:

return Completable.defer(() -> {
startRequests();
return Observable.merge(requestSubject.asObservable()).toCompletable();
});

在此示例中,startRequest 会将网络请求 (Retrofit) 添加到 requestSubject,即 PublishSubject<Observable<SomeResponse>> .

具体来说,我希望网络请求一旦订阅就会在 IO 调度程序上启动,并且返回的 Completable 不会完成,直到我在其中一个请求的 onNext 中调用 requestSubject.onComplete() 。 。

我还没有弄清楚如何在不执行请求两次的情况下处理请求的响应(每个订阅上的改造请求)。

它是这样工作的,还是有更好的方法来实现我正在寻找的东西?谢谢!

最佳答案

只需使用 flatmap() 并将其转换为 Completable

这是一个(模拟)执行网络请求的示例,该请求在 io 池上返回 2 个项目,然后对计算池中的这些项目执行计算,一切都是并行的:

@Test
public void foo() throws Exception {
Observable.range(1, 10)
.flatMap(this::getNItemsFromNetwork)
.flatMap(this::asyncCompuatation)
.ignoreElements()
.subscribe(() -> System.out.println("onComplete"),
(t) -> System.out.println("onError"));

Thread.sleep(10000);
}

Observable<String> getNItemsFromNetwork(int count) {
return Observable.just(count)
.subscribeOn(Schedulers.io())
.doOnNext(i -> System.out.println("Executing request for " + count + " on thread: " + Thread.currentThread()))
.flatMap(number -> Observable.just("Item nr " + number + ".1", "Item nr " + number + ".2"))
.delay(random.nextInt(1000), TimeUnit.MILLISECONDS);
}

Observable<String> asyncCompuatation(String string) {
return Observable.just(string)
.subscribeOn(Schedulers.computation())
.delay(random.nextInt(1000), TimeUnit.MILLISECONDS)
.doOnNext(number -> System.out.println("Computing " + number + " on thread: " + Thread.currentThread()));
}

以及验证输出:


在线程上执行 7 的请求:Thread[RxCachedThreadScheduler-7,5,main]
在线程上执行 6 的请求:Thread[RxCachedThreadScheduler-6,5,main]
在线程上执行 5 的请求:Thread[RxCachedThreadScheduler-5,5,main]
在线程上执行 1 的请求:Thread[RxCachedThreadScheduler-1,5,main]
在线程上执行 4 的请求:Thread[RxCachedThreadScheduler-4,5,main]
在线程上执行 3 的请求:Thread[RxCachedThreadScheduler-3,5,main]
在线程上执行 8 的请求:Thread[RxCachedThreadScheduler-8,5,main]
在线程上执行 2 的请求:Thread[RxCachedThreadScheduler-2,5,main]
在线程上执行 9 的请求:Thread[RxCachedThreadScheduler-9,5,main]
在线程上执行 10 的请求:Thread[RxCachedThreadScheduler-10,5,main]
线程上的计算项目 nr 7.1:Thread[RxComputationThreadPool-5,5,main]
线程上的计算项目 nr 10.2:Thread[RxComputationThreadPool-2,5,main]
线程上的计算项目 nr 6.2:Thread[RxComputationThreadPool-1,5,main]
线程上的计算项目 nr 3.1:Thread[RxComputationThreadPool-7,5,main]
线程上的计算项目 nr 4.1:Thread[RxComputationThreadPool-7,5,main]
线程上的计算项目 nr 3.2:Thread[RxComputationThreadPool-1,5,main]
线程上的计算项目 nr 6.1:Thread[RxComputationThreadPool-7,5,main]
线程上的计算项目 nr 2.1:Thread[RxComputationThreadPool-7,5,main]
线程上的计算项目 nr 5.2:Thread[RxComputationThreadPool-2,5,main]
线程上的计算项目 nr 5.1:Thread[RxComputationThreadPool-5,5,main]
线程上的计算项目 nr 7.2:Thread[RxComputationThreadPool-2,5,main]
线程上的计算项目 nr 2.2:Thread[RxComputationThreadPool-1,5,main]
线程上的计算项目 nr 10.1:Thread[RxComputationThreadPool-5,5,main]
线程上的计算项目 nr 9.1:Thread[RxComputationThreadPool-5,5,main]
线程上的计算项目 nr 4.2:Thread[RxComputationThreadPool-1,5,main]
线程上的计算项目 nr 9.2:Thread[RxComputationThreadPool-2,5,main]
线程上的计算项目 nr 8.1:Thread[RxComputationThreadPool-5,5,main]
线程上的计算项目 nr 8.2:Thread[RxComputationThreadPool-2,5,main]
线程上的计算项目 nr 1.1:Thread[RxComputationThreadPool-7,5,main]
线程上的计算项目 nr 1.2:Thread[RxComputationThreadPool-1,5,main]
完成时

关于java - 将未知数量的并行子任务合并为一个 Completable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42505009/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com