gpt4 book ai didi

java - Flux.generate 与 Consumer 和并行

转载 作者:行者123 更新时间:2023-12-02 12:10:50 27 4
gpt4 key购买 nike

我有简单的通量

Flux<Long> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next(i);
if (i == 3) sink.complete();
return state;
}, (state) -> System.out.println("state: " + state));

在单个线程中按预期工作:

flux.subscribe(System.out::println);

输出为

0 1 2 3 state: 4

但是当我切换到并行时:

flux.parallel().runOn(Schedulers.elastic()).subscribe(System.out::println);

应打印状态的消费者:未调用 Number。我刚刚看到:

0 3 2 1

这是一个错误还是预期的功能?

最佳答案

我不是响应式(Reactive)专家,但在深入研究源代码后,似乎这种行为是设计使然;看来创建ParallelFlux有阻塞State Consumer调用的副作用;如果您想并行并调用 State Consumer,您可以使用:

flux.publishOn(Schedulers.elastic()).subscribe(System.out::println);

关于java - Flux.generate 与 Consumer 和并行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46562068/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com