gpt4 book ai didi

java - SimpleMessageListenerContainer 批量消息处理

转载 作者:行者123 更新时间:2023-11-30 03:07:14 28 4
gpt4 key购买 nike

我有一个传入数据流作为单独的消息发送到 RabbitMQ。

我想将这些发送到需要一批消息的服务。当我有一批 1000 条消息或 5 秒过期时,我需要将请求发送到服务。使用 SimpleMessageListenerContainer 可以吗?

SimpleMessageListenerContainer 支持事务,但这对 5 秒超时没有帮助。我确实查看了方法 doReceiveAndExecute(BlockingQueueConsumer Consumer) 和“receiveTimeout”,但由于此变量位于事务循环内,因此我最终可能会为每条消息等待 5 秒(1000*5 秒= 83 分钟)。

我目前有一个 channel 感知监听器,它将消息批处理到批量处理器中,该处理器将管理我的超时和队列长度。 SimpleMessageListenerContainer 设置为手动确认。然而,由于监听器在消息实际发送到服务之前返回,当我确实要确认消息(因为 channel 已关闭)时,我偶尔会遇到问题。

我考虑过编写自己的ListenerContainer,将整个BlockingQueueConsumer发送到Listener。这是唯一的解决方案还是有人已经成功做到了类似的事情?

最佳答案

您可以使用ChannelAwareMessageListener,设置acknowledgeMode=MANUAL;在监听器中累积交付;启动一个计时器(计划任务)在 +5 秒内执行并保留对 channel 的引用。当新的交货到达时,取消任务,将新的交货添加到集合中。

当 1000 个交付到达时(或计划任务触发);调用您的服务;然后使用channel.basicAck()(多个)来确认已处理的消息。

您将需要处理一些竞争条件,但这应该很容易。也许另一个批处理队列是最简单的,因为其他一些线程正在等待批处理到达该队列。

编辑

从 2.2 开始,SimpleMessageListenerContainer 支持 native 批量传送消息 - 请参阅 Batched Messages .

Starting with version 2.2, the SimpleMessageListeneContainer can be use to create batches on the consumer side (where the producer sent discrete messages).

Set the container property consumerBatchEnabled to enable this feature. deBatchingEnabled must also be true so that the container is responsible for processing batches of both types. Implement BatchMessageListener or ChannelAwareBatchMessageListener when consumerBatchEnabled is true. See @RabbitListener with Batching for information about using this feature with @RabbitListener.

关于java - SimpleMessageListenerContainer 批量消息处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34453305/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com