gpt4 book ai didi

project-reactor - Reactor 3.x - 限制 groupBy Flux 的时间

转载 作者:行者123 更新时间:2023-12-01 00:21:59 28 4
gpt4 key购买 nike

不管上游的完整性如何,有没有办法强制 groupBy() 生成的 Flux 在一段时间后完成(或类似地,限制“打开”组的最大数量)?我有如下内容:

Flux<Foo> someFastPublisher;

someFastPublisher
.groupBy(f -> f.getKey())
.delayElements(Duration.ofSeconds(1)) // rate limit each group
.flatMap(g -> g) // unwind the group
.subscribe()
;

并且遇到 Flux 挂起的情况,假设是因为组数大于 flatMap的并发。我可以增加 flatMap并发,但没有简单的方法来判断最大可能的大小是多少。相反,我知道 Foo正在按 Foo.key 分组将在时间/发布顺序上彼此接近,并且宁愿在 groupBy Flux 与 flatMap 并发性上使用某种时间窗口(并且以具有相同 key() 的两个不同组结束大不了)。

我猜 groupBy Flux 直到 someFastPubisher 才会完成onCompletes - 即 Flux 移交给 flatMap保持“开放”(尽管他们不太可能获得新事件)。

我可以通过预取 Integer.MAX 来解决这个问题。在 groupBy 或 Integer.MAX并发性 - 但有没有办法控制组的“生活”?

最佳答案

是的:您可以申请 take(Duration)组,以确保它们提前关闭,之后将打开一个具有相同 key 的新组:

source.groupBy(v -> v.intValue() % 2)
.flatMap(group -> group
.take(Duration.ofMillis(1000))
.count()
.map(c -> "group " + group.key() + " size = " + c)
)
.log()
.blockLast();

关于project-reactor - Reactor 3.x - 限制 groupBy Flux 的时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47893575/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com