gpt4 book ai didi

java - GroupedFlux 创建的 groupBy 运算符的数量是否有限

转载 作者:行者123 更新时间:2023-11-30 06:20:42 37 4
gpt4 key购买 nike

根据document reference groupBy 运算符根据运算符的键盘映射器功能将给定的 Flux 拆分为多个 GroupedFlux。如果我使用 257 个整数执行以下代码,它可以正常工作,但不能使用 258

    public void groupByTest() {
Flux.range(1, 258)
.groupBy(val -> val)
.concatMap(g -> g.map(val -> val + "test"))
.doOnNext(System.out::println)
.blockLast();
}

这是否意味着 groupBy 运算符不能创建超过 257 个组?

最佳答案

groupBy javadoc 中所述:

The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

这意味着一旦发出一个组,groupBy 需要获得更多请求才能取得进展。默认情况下,它最多打开 256 个组,然后需要更多请求或检测组是否完整。并且 groupBy 无法“知道”组是否完整,直到:

  • A) 该组被取消(在这种情况下,如果稍​​后出现具有相同键的值,它将重新创建一个新组)
  • B) 源已被完全处理(只有在源小于 256 个元素、默认 groupBy prefetchgroupBy 收到 时才会发生这种情况) onComplete 来自源的信号)

val -> val 标准和 concatMap 都符合这些要求。

groupBy 标准最终会生成与值一样多的组。这里有 258 个组,而 groupBy 的默认容量可跟踪 256 个组。

Note: If the whole sequence starts less than 256 groups, it would work fine. Try setting the criteria to val -> val % 2 and see that it works. Then try to bump the range to range(1, 513) and see how it hangs again.

由于 concatMap 的工作原理,最后一次测试仅限于 512 个元素。

concatMap 在我们的例子中尤其糟糕,因为它只会订阅下一组并在第一组完成时取得进展。这与上面的条件 B) 冲突,造成 groupByconcatMap 都无法取得进展的情况。

Note: In the small example with 513, concatMap would start consuming group 1 and wait for it to complete before it consumes group 2. BUT groupBy stops emitting once it has fetched 256 elements for group 1 and then waits for downstream to start consuming group 2. As a result, it has just too few data to detect that the group is complete, concatMap waits for that completion signal and never subscribes to group 2, hanging the whole thing.

Using a flatMap would fix that, because flatMap will subscribe to multiple groups concurrently, and 2 groups is no trouble for it: it will consume both groups and make progress.

关于java - GroupedFlux<T> 创建的 groupBy 运算符的数量是否有限,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48211297/

37 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com