gpt4 book ai didi

java - Mono.when 在发布者完成之前触发

转载 作者:行者123 更新时间:2023-12-02 09:41:51 30 4
gpt4 key购买 nike

示例代码:

Flux<Integer> fluxSrc = Flux.<Integer> create(e -> {
e.next(1);

try {
Thread.sleep(500);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}

e.complete();
})
.publishOn(Schedulers.single())
.publish().autoConnect(2);

Flux<Integer> fluxA = fluxSrc
.publishOn(Schedulers.single())
.map(j -> 10 + j);

fluxA.subscribe(System.out::println);

Mono<Integer> monoB = fluxSrc
.publishOn(Schedulers.single())
.reduce(20, (j, k) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
return j + k;
});

monoB.subscribe(System.out::println);

Mono.when(fluxA, monoB)
.block();

System.out.println("After");

这会产生以下输出:

11
After
21

为什么它不等待两个发布者(fluxAmonoB)完成?我应该如何构建代码,以确保所有发布者在 After 到达之前完成?

最佳答案

通过使用 .publish() , fluxSrc变成热熔剂。考虑:

Hot publishers, on the other hand, do not depend on any number of subscribers. They might start publishing data right away and would continue doing so whenever a new Subscriber comes in (in which case said subscriber would only see new elements emitted after it subscribed). For hot publishers, something does indeed happen before you subscribe.

( https://projectreactor.io/docs/core/release/reference/#reactor.hotCold )

修复此问题的一种方法是删除 publish并在冷流下运行。另一种是改.autoConnect(2);.autoConnect(3); - 那是因为您想在第三次订阅时开始处理数据 - Mono.when(fluxA, monoB).block();已达到(之前的为 fluxA.subscribemonoB.subscribe )。

编辑: When确实等待源完成,但它从之前的订阅中获得了 onComplete 信号。

可能发生的情况是:

  1. flux A 被 fluxA.subscribe(System.out::println); 订阅,发出 11 并打印出来。
  2. flux B 被 monoB.subscribe(System.out::println); 订阅并开始减少。
  3. Mono.when被订阅(触发了“多播”——通量被第二次订阅)。
  4. 减少开始,结果为 21。
  5. 另一次缩减开始,并立即完成,结果为 20(缩减空流 - 只有 FluxSrc 中的项目已被另一次缩减消耗)。
  6. flux A 向两个订阅者发送了 onComplete。
  7. flux B 在完成时发送,减少结果 = 20。它被传递给 Mono.when 进行的订阅,这就是它没有被打印的原因。
  8. 自 Mono.when 订阅以来,两个通量都在 onComplete 上发送,因此 After已打印。
  9. 大约在那时,第一次归约完成,值为 21,该值被传递到 monoB.subscribe(System.out::println);

关于java - Mono.when 在发布者完成之前触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57014566/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com