gpt4 book ai didi

java - 创建三个 Mono 后立即并行执行它们,等待所有 Mono 完成并以特定顺序/逻辑收集结果

转载 作者:行者123 更新时间:2023-12-02 08:49:51 25 4
gpt4 key购买 nike

我是 Spring WebFlux 的新手,所以请保持温柔......如果我错过了一些明显的事情,我很抱歉,但我尝试寻找在线示例,每次我最终都会进行顺序调用。

我有这样的情况:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class);

Response 是我项目中的一个类,但对于这个示例,我们可以将它们视为每个单个列表的简单容器。

我愿意:

  • 并行执行它们(一旦我将它们分配给 mono1/mono2/mono3,也许可以通过调用 .subscribeOn(Schedulers.parallel()) ?)
  • 完成所有操作后,将响应保存到 resp1、resp2、resp3
  • 如果 resp1 有结果(列表不为空)则返回 resp1 否则...
  • ...如果resp2有结果(列表不为空)则返回resp2,否则...
  • ...返回resp3(即使它是空的)

我如何实现这一目标?(因为我失败了多次)

我的第一次尝试是:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
ParallelFlux.from(mono1, mono2, mono3).then().block(); // im not sure if this really execute them in parallel
Response resp1 = mono1.block();
Response resp2 = mono2.block();
Response resp3 = mono3.block();
if (resp1.isNotEmpty()) {
return resp1;
}
if (resp2.isNotEmpty()) {
return resp2;
}
return resp3;

这似乎不起作用,ParallelFlux.from(mono1, mono2, mono3).then().block() 真的并行运行这些单声道吗?另外为什么我需要 ParallelFlux?我不能在创建每个单声道后立即说“在单独的线程上运行这个单声道”吗?每个 .block() 实际上都会重做调用...就像它重新执行单声道一样...为什么?

更新:

通过阅读评论,我将代码更改为:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());

Tuple3<Response, Response, Response> all = Mono.zip(mono1, mono2, mono3).block();

Response resp1 = all.getT1();
Response resp2 = all.getT2();
Response resp3 = all.getT3();

if (resp1.hasMessages()) {
return resp1;
}
if (resp2.hasMessages()) {
return resp2;
}
return resp3;

现在看来可行了。我需要做其他事情还是我可以接受这个解决方案?我是否还应该更改 Mono.zip(mono1, mono2, mono3).subscribeOn(Schedulers.parallel()).block 中的 Mono.zip(mono1, mono2, mono3).block() ()?附注我现在再次阅读文档,我认为我应该使用 Schedulers.elastic() 而不是 Schedulers.parallel()。

最佳答案

创建单声道不会自动执行它。您需要像 subscribeblock 这样的终端操作符来触发执行(subscribeOn 不是终端操作符。您不需要它,除非您想要将执行推迟到不同的线程池。默认情况下它使用默认线程池)。如果您希望多个单声道并行运行,您可以使用 zip 运算符。

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());

return Mono.zip(mono1, mono2, mono3)
.map(t -> {
if (t.getT1().isEmpty()) {
if (t.getT2().isEmpty()) {
return t.getT3();
} else {
return t.getT2();
}
} else {
return t.getT1();
}
});

注意:调用此函数不会执行并给出结果。它会返回一个单声道,您可以在其中调用 subscribe() 来获取结果。

关于java - 创建三个 Mono 后立即并行执行它们,等待所有 Mono 完成并以特定顺序/逻辑收集结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60850570/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com