gpt4 book ai didi

java - 当消息数大于并发消费者数时,如何消费 Spring IntegrationFlow 中所需的所有消息?

转载 作者:行者123 更新时间:2023-11-30 02:18:13 26 4
gpt4 key购买 nike

我的集成流程定义如下:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq iterator().next().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();

其目的是将“批量”发送的多条相关消息分组。这是一个例子:

// Message 1
{ "name": "Message1",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 1,
.... }

// Message 2
{ "name": "Message2",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 2,
.... }

由于描述的原因here我们对 RabbitMQ 使用手动 ack:ing 以避免丢失消息。

集成流程对于大小为 2 的批处理非常有效,但一旦批处理中的消息超过 2 条,我们就会遇到麻烦:

[my-service] 2017-12-04 17:46:07.966  INFO 1 --- [ask-scheduler-5] x.y.EntityUpdater : Will update entity [entitId] from messages: Message1, Message2 
[my-service] 2017-12-04 17:46:09.976 INFO 1 --- [ask-scheduler-3] x.y.EntityUpdater : Will update entity [entitId] from messages: Message3

请注意,记录的消息之间的时间大约为 2 秒(即我们配置为 groupTimeout)。

我怀疑其原因是 Spring 消耗了 2 条消息(不会自动 ack:ed),然后聚合等待第三条消息(因为在本例中 batchSize 为 3) 。但这条消息永远不会在 2 秒窗口内被消费,因为只有两个并发消费者。

concurrentConsumers 计数增加到 3 可解决这个特定问题。问题是我们不知道收到的批处理大小,它们可能非常大,可能有 50 个左右。这意味着简单地增加 concurrentConsumers 并不是一个可行的选择。

在 Spring 中处理这个问题的正确方法是什么?

最佳答案

正如我在 comments to this answer 中讨论的那样...

使用此模式时,并发 * 预取必须足够大,以包含所有未完成批处理的消息。

因此,我不赞成使用该模式,除非您有相当可预测的数据。

关于java - 当消息数大于并发消费者数时,如何消费 Spring IntegrationFlow 中所需的所有消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47651980/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com