gpt4 book ai didi

java - RxJava 并行计算的排序输出

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:17:35 26 4
gpt4 key购买 nike

我有一个要并行执行的任务列表,但我想以与原始列表相同的顺序显示任务的结果。换句话说,如果我有任务列表 [A,B,C],我不想在显示 A-result 之前显示 B-result,但我也不想等到 A-task 完成后再开始 B -任务。

此外,我想尽快显示每个结果,换句话说,如果任务完成顺序是 B,然后是 A,然后是 C,我不想在收到 B-result 时显示任何内容,然后当我收到 A-result 时立即显示 A-result,然后显示 B-result,然后在我收到它时显示 C-result。

通过为每个任务创建一个 Observable,将它们与合并结合起来,并在计算线程池上订阅,然后编写一个订阅者,它为任何乱序接收的结果编写一个缓冲区,这当然不是一件非常棘手的事情。然而,Rx 的经验法则往往是“已经有一个运算符”,所以问题是“解决这个问题的正确 RxJava 方法是什么?”如果真的有这样的事情。

最佳答案

看起来你需要 concatEager 来完成这个任务,但是使用 1.0.15 之前的工具是有可能实现它的,而不需要“创建”Observables。这是一个例子:

Observable<Long> source1 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(10);
Observable<Long> source2 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(20);
Observable<Long> source3 = Observable.interval(100, 100, TimeUnit.MILLISECONDS).take(15);

Observable<Observable<Long>> sources = Observable.just(source1, source2, source3);

sources.map(v -> {
Observable<Long> c = v.cache();
c.subscribe(); // to cache all
return c;
})
.onBackpressureBuffer() // make sure all source started
.concatMap(v -> v)
.toBlocking()
.forEach(System.out::println);

缺点是它会在整个序列持续时间内保留所有值。这可以通过一种特殊的 Subject 来解决:UnicastSubject 但 RxJava 1.x 没有,而且可能不会“正式”获得。但是,您可以查看其中一个 my blog posts并为自己构建并具有以下代码:

//...
sources.map(v -> {
UnicastSubject<Long> subject = UnicastSubject.create();
v.subscribe(subject);
return subject;
})
//...

关于java - RxJava 并行计算的排序输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33102513/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com