gpt4 book ai didi

java - 有条件地将 Mono 与 Flux 结合起来

转载 作者:行者123 更新时间:2023-12-01 16:49:01 25 4
gpt4 key购买 nike

我需要合并两个响应式(Reactive)发布器(Mono 和 Flux)的结果。我尝试使用 zipjoin 函数来完成此操作,但我无法满足两个特定条件:

  1. 结果应包含与 Flux 发出的元素一样多的元素,但相应的 Mono 源应仅调用一次(仅此条件可以通过 join 实现)
  2. 当 Flux 为空时,链应该完成而无需等待 Mono 元素

第一个条件的解决方案在 Combine Mono with Flux 中给出。条目(粘贴在下面)。但我无法在不阻塞链条的情况下实现第二个条件 - 我想避免这种情况。

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();

List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
return x + y;
}).collectList().block();

System.out.println(list);

最佳答案

如果您想在 Flux 为空时取消整个操作,您可以执行以下操作

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();

//Uncomment below and comment out above statement for empty flux

//Flux<Integer> flux = Flux.empty();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();

//Throw exception if flux is empty
flux = flux.switchIfEmpty(Flux.error(IllegalStateException::new));

List<String> list = flux
.join(mono, s -> Flux.never() , s -> Flux.never(), (x, y) -> x + y)
//Catch exception and return nothing
.onErrorResume(s -> Flux.empty())
.collectList().block();

System.out.println(list);

如果您希望 Mono 完成但不希望 join 挂起,您可以执行以下操作

DirectProcessor<Integer> processor = DirectProcessor.create();
//Could omit sink, and use processor::onComplete in place of sink::complete
//But typically recommended as provides better thread safety
FluxSink<Integer> sink = processor.serialize().sink();

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();

//Uncomment below and comment out above statement for empty flux

//Flux<Integer> flux = Flux.empty();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();

List<String> list = flux
.doOnComplete(sink::complete)
.join(mono, s -> processor , s -> processor, (x, y) -> x + y).collectList().block();

System.out.println(list);

关于java - 有条件地将 Mono 与 Flux 结合起来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61730444/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com