gpt4 book ai didi

project-reactor - Reactor GroupedFlux - 等待完成

转载 作者:行者123 更新时间:2023-12-04 07:07:02 28 4
gpt4 key购买 nike

有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流完成处理?
当然,不必添加一个未知持续时间的 sleep ......

@Test
public void groupByPublishOn() throws InterruptedException {
UnicastProcessor<Integer> processor = UnicastProcessor.create();

List<Integer> results = new ArrayList<>();
Flux<Flux<Integer>> groupPublisher = processor.publish(1)
.autoConnect()
.groupBy(i -> i % 2)
.map(group -> group.publishOn(Schedulers.parallel()));

groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));

List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
input.forEach(processor::onNext);
processor.onComplete();

Thread.sleep(500);

Assert.assertTrue(results.size() == input.size());
}

最佳答案

您可以替换这些行:

 groupPublisher.log()
.subscribe(g -> g.log()
.subscribe(results::add));

有了这个
groupPublisher.log()
.flatMap(g -> g.log()
.doOnNext(results::add)
)
.blockLast();
flatMap是一种比 subscribe-within-subscribe 更好的模式,它将为您订阅该组。
doOnNext处理消耗的副作用(向集合添加值),使您无需在订阅中执行该操作。
blockLast()替换订阅,而不是让您为它阻塞的事件提供处理程序,直到完成(并返回最后发出的项目,但您已经在 doOnNext 中处理过了)。

关于project-reactor - Reactor GroupedFlux - 等待完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48583716/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com