gpt4 book ai didi

java - 使用 groupBy 进行 Flux 并行串行执行

转载 作者:行者123 更新时间:2023-12-02 10:01:34 26 4
gpt4 key购买 nike

假设我有这个:

Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)
.groupBy(i -> i % 3);

并说我有一个方法:

Mono<Integer> getFromService(Integer i);

我想为每个组并行调用 getFromService,但请确保每个组内的调用都是串行的。

对于上面的示例,这将是具有以下输入值的三个并行流:

stream 1: 0 -> 3 -> 6 -> 9
stream 2: 1 -> 4 -> 7 -> 10
stream 3: 2 -> 5 -> 8 -> 11

我尝试过这个,但它没有达到我想要的效果:

Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))

这是一次为所有整数并行调用服务。我该如何继续?

最佳答案

使用 concatMapflatMapSequential而不是内部的 .flatMap

如果您希望在每个组内按顺序执行(即每个组内一次仅订阅一个getFromService),则使用.concatMap,像这样:

Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))

如果组内并行执行可以,但您只关心发出序列的顺序,则使用 flatMapSequential,如下所示:

Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))

另一种选择是使用 .flatMap 并将 concurrency 参数设置为 1,但我建议使用上述其中一种。

关于java - 使用 groupBy 进行 Flux 并行串行执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55582796/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com