gpt4 book ai didi

java - 在另一个通量结束后执行并行通量

转载 作者:行者123 更新时间:2023-12-01 18:33:20 25 4
gpt4 key购买 nike

顺便说一句,我还在学习 weblux;我不知道这是否可能,或者我的方法是错误的,但考虑到这种并行通量。

Flux<String> enablers = Flux.fromIterable(enablersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.getAMono(string, entity, element))
.sequential();

谁调用了具有 Web 客户端请求的方法 (service.getAMino)

webClient.post()
.uri(url)
.headers(headers -> headers.addAll(httpHeaders))
.body(BodyInserters.fromObject(request))
.retrieve()
.bodyToMono(entity2.class);

我需要等待启用器通量的流结束并处理其中的所有响应,原因是如果其中一个给我错误或否定响应,我将不会运行另一个并行通量 for blockers

Flux<String> blockers = Flux.fromIterable(blockersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.callAMono(string, entity, element))
.sequential();

我想到了“zip”方法,但是这个合并了两个响应,这不是我想要的如果有人能帮我解决这个问题。

更新

enablers. //handle enablers response and if error return a custom Mono<response> with .reduce

如果 enablers 的句柄中没有错误,则继续使用其他 Flux 进行 .thenMany

最佳答案

我在第一个flux中找到了通过条件any来做到这一点的方法,就像这样

Flux.fromIterable(enablersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.getAMono(string, entity, element))
.sequential()
.any(element -> *stuff here)//condition
.flatMap(condition->{
if(condition.equals(Boolean.FALSE)){
return Flux.fromIterable(blockersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.callAMono(string, entity, element))
.sequential()
.reduce(**stuff here)// handle noError response and return;
}
return Mono.just(**stuff here);//handle error response and return
});

如果有其他方法可以做到这一点,我很高兴您将其发布在这里,谢谢,:D

关于java - 在另一个通量结束后执行并行通量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60119547/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com