gpt4 book ai didi

java - 等待ParallelFlux完成

转载 作者:行者123 更新时间:2023-12-01 13:02:05 24 4
gpt4 key购买 nike

我创建了一个ParallelFlux,然后使用了.sequential(),期望那时我可以计算或“减少”并行计算的结果。问题似乎是并行线程被触发,没有任何等待的线程。

使用CountDownLatch可以使我工作,但我认为我不必这样做。

TL; DR-我无法获得此代码的打印结果:

    Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.subscribe(System.out::println);

最佳答案

在主体中执行该代码时,这很好地演示了事物的异步特性。该操作在弹性调度程序的线程上运行,并且订阅触发异步处理,因此它立即返回。

有两种方法可以将应用程序的末尾与Flux的末尾同步:

doOnNext打印并使用blockLast()

block *方法通常用于main和test中,它们别无选择,只能切换回阻塞模型(即,否则,测试/main将在处理结束之前退出)。

我们切换以doOnNext打印每个发出的项目的“副作用”,该类型专用于这种用例。然后,我们使用blockLast()阻塞直到通量完成。

Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doOnNext(System.out::println)
.blockLast();

subscribe打印,并以CountDownLatch使用doFinally

这个比较复杂,但是如果您想探索该方法的工作原理,则可以实际使用subscribe。

我们只添加了 doFinally,它会在任何完成信号(取消,错误或完成)上触发。我们使用 CountDownLatch阻塞主线程,该线程将从 doFinally内部异步计数。
CountDownLatch latch = new CountDownLatch(1);

Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doFinally(signal -> latch.countDown())
.subscribe(System.out::println);

latch.await(10, TimeUnit.SECONDS);

关于java - 等待ParallelFlux完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60821783/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com