gpt4 book ai didi

java - Camel AggregationStrategy 为下一次聚合迭代保留排除的消息

转载 作者:行者123 更新时间:2023-11-30 02:50:02 24 4
gpt4 key购买 nike

嗨,我的输入文件看起来像这样

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2
DDD,1
DDD,5
DDD,4
EEE,1

我想将其拆分并聚合为多条消息,但将关联的记录保留在一起,例如:

消息 1:

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1
CCC,2

消息2:

DDD,1
DDD,5
DDD,4
EEE,1

并防止这样的事情:

消息 1:

AAA,1
AAA,2
AAA,3
BBB,1
CCC,1

消息2:

CCC,2
DDD,1
DDD,5
DDD,4
EEE,1

CCC,2 应写入消息 1 或 CCC,1 应写入消息 2。

completionSize 不是常量,但应该类似于阈值。关于上面的示例,“如果还有以 CCC 开头的其他记录,则将消息中的 5 条记录也聚合到消息中”。

这是我的路线:

.split().tokenize("\n").streaming()
.aggregate().constant(true)
.aggregationStrategy(new MyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)

达到completionSize阈值5后,MyAggregationStrategy必须检查下一条消息(newExchange)并决定是否将其聚合到oldExchange,即使大小大于5。如果不聚合该消息到oldExchange,聚合完成,新的聚合开始。如何确保上次聚合拒绝的这条消息将成为新聚合中考虑的第一条消息?

由于输入文件可能非常大,我将使用流式传输,而不是先读取整个文件,然后通过自定义 bean 将其剪切为单个消息。

最佳答案

为什么不只将相关元素组合在一起?您当前正在使用 .constant(true) 进行聚合,这意味着只有一个相关组需要聚合。相反,您可以执行以下操作:

.split().tokenize("\n").streaming()
.process(e -> ...) //extract the type (AAA, BBB, etc.) into a header called type
.aggregate(header("type"), new MyAggregationStrtegy())
.completionTimeout(5000)

这样,只有相关的消息才会成为聚合批处理的一部分。

关于java - Camel AggregationStrategy 为下一次聚合迭代保留排除的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38997265/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com