gpt4 book ai didi

java - 使用 RxJava 将并行任务列表的结果组合到单个 HashMap

转载 作者:行者123 更新时间:2023-11-30 07:57:56 25 4
gpt4 key购买 nike

我有一个 List<Task> , 其中Task是一个接口(interface),只有一个方法返回 Map<String, JsonElement> .我怎样才能执行 List<Task>并行并返回一个新的 HashMap每个 Task 的组合结果?

我目前有这个:

List<Task> tasks = getTasks();

Observable.from(tasks)
.flatMap(new Func1<Task, Observable<Map<String, JsonElement>>>() {
@Override
public Observable<Map<String, JsonElement>> call(Task task) {
return Observable.just(task.get());
}
});

// group into single Map<String,JsonElement>
// create Observable<Map<String,JsonElement>> with all results

最佳答案

使用defer 封装每个订阅的Map 和一个基于您想要的线程池大小的Scheduler:

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.defer(() -> {
final Map<String, JsonElement> map = new ConcurrentHashMap<>();
return Observable
.from(tasks)
.flatMap(task ->
Observable
.fromCallable(task -> task.get())
.doOnNext(mp -> map.putAll(mp))
.subscribeOn(scheduler))
.ignoreElements()
.concatWith(Observable.just(map));
});

请注意,Scheduler 的选择将取决于正在执行的任务的性质。如果 CPU 占主导地位,您可能会对 Schedulers.computation() 感到满意。

关于java - 使用 RxJava 将并行任务列表的结果组合到单个 HashMap,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40828359/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com