gpt4 book ai didi

java - 在 flatMap 上使用reduce时,Reactor Flux订阅者流停止

转载 作者:行者123 更新时间:2023-12-02 01:47:54 26 4
gpt4 key购买 nike

我想更改单个订阅者的代码。现在我有了

auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120)).subscribe(
s -> s.groupBy(Auction::getItem).subscribe( longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats )
));

这段代码工作正常,reduce方法非常简单。我尝试更改单个订阅者的代码

    auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120))
.flatMap(window -> window.groupBy(Auction::getItem))
.flatMap(longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats))
.subscribe(itemDumpStatsMono -> log.info(itemDumpStatsMono.toString()));

这是我的代码,该代码不起作用。没有错误,也没有结果。调试后,我发现当我减少流时,代码卡在第二个 flatMap 上。我认为问题出在 flatMap 合并上,卡在 Mono 解析上。现在有人如何解决这个问题并仅使用单个订阅者?

如何复制,可以使用另一个类或创建一个。小尺寸有效,但大尺寸则消亡

List<Auction> auctionList = new ArrayList<>();
for (int i = 0;i<100000;i++){
Auction a = new Auction((long) i, "test");
a.setItem((long) (i%50));
auctionList.add(a);
}

Flux.fromIterable(auctionList).groupBy(Auction::getId).flatMap(longAuctionGroupedFlux ->
longAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats)).collectList().subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()));

这种方法立竿见影,但我使用了 3 个订阅者

Flux.fromIterable(auctionList)
.groupBy(Auction::getId)
.subscribe(
auctionIdAuctionGroupedFlux -> auctionIdAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats).subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()
)
));

最佳答案

我认为您描述的行为与 groupByflatMap 链接之间的交互有关。检查groupBy文档。它指出:

The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

默认情况下,maxConcurrency (flatMap) 设置为 256(我检查了 3.2.2 的源代码)。所以,选择超过 256 个组可能会导致执行挂起(特别是当所有执行都发生在同一线程上时)。

以下代码有助于理解链接运算符 groupBy 和 flatMap 时会发生什么:

@Test
public void groupAndFlatmapTest() {
val groupCount = 257;
val groupSize = 513;
val list = rangeClosed(1, groupSize * groupCount).boxed().collect(Collectors.toList());
val source = Flux.fromIterable(list)
.groupBy(i -> i % groupCount)
.flatMap(Flux::collectList);
StepVerifier.create(source).expectNextCount(groupCount).expectComplete().verify();
}

此代码的执行挂起。将 groupCount 更改为 256 或更少可使测试通过(对于 groupSize 的每个值)。

因此,关于您原来的问题,您很可能正在使用键选择器 Auction::getItem 创建大量组。

关于java - 在 flatMap 上使用reduce时,Reactor Flux订阅者流停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53477757/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com