gpt4 book ai didi

java - 如果 ThreadPoolTask​​Executor 没有可用容量,则将消息保留在 ActiveMQ 队列中

转载 作者:行者123 更新时间:2023-11-30 04:20:55 25 4
gpt4 key购买 nike

我有两个 Java 进程,第一个进程生成消息并放置将它们放入 ActiveMQ 队列中。第二个进程(消费者)使用Spring集成以从队列获取消息并在线程中处理它们。

我有两个要求:

  1. 消费者应该有 3 个处理线程。如果我有 10 条消息通过队列进入,我希望有 3 个线程处理前 3 个消息,其他7条消息应该被缓冲。

  2. 当消费者在某些消息尚未处理时停止时,它重新启动后应继续处理消息。

这是我的配置:

<bean id="messageActiveMqQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="example.queue" />
</bean>

<int-jms:message-driven-channel-adapter
destination="messageActiveMqQueue" channel="incomingMessageChannel" />

<int:channel id="incomingMessageChannel">
<int:dispatcher task-executor="incomingMessageChannelExecutor" />
</int:channel>

<bean id="incomingMessageChannelExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="daemon" value="false" />
<property name="maxPoolSize" value="3" />
</bean>

<int:service-activator input-channel="incomingMessageChannel"
ref="myMessageProcessor" method="processMessage" />

第一个要求按预期工作。我产生 10 条消息和 3myMessageProcessors 每个开始处理一条消息。当第一条消息发出后完成,第四条消息处理完成。

但是,当我在处理所有消息之前杀死消费者时,那些消息丢失。重启后,消费者不会收到这些消息再次。

我认为在上面的配置中这是因为由ThreadPoolTask​​Executor 将消息排队。所以消息已经被删除了来自传入消息 channel 。因此我尝试设置队列容量传入消息 channel 执行器:

<property name="queueCapacity" value="0" />

但是现在当我有超过 3 条消息时,我会收到错误消息:

2013-06-12 11:47:52,670 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'incomingMessageChannel'

我还尝试将消息驱动 channel 适配器更改为入站网关,但这给了我同样的错误。

我是否必须在入站网关中设置错误处理程序,以便错误返回到 ActiveMQ 队列?如果 ThreadPoolTask​​Executor 没有空闲线程,我该如何配置队列以便将消息保留在队列中?

提前致谢,

本尼迪克特

最佳答案

没有;您应该使用 <message-driven-channel-adapter/> 来控制并发,而不是使用执行程序 channel .

删除<dispatcher/>从 channel 并设置 concurrent-consumers="3"在适配器上。

关于java - 如果 ThreadPoolTask​​Executor 没有可用容量,则将消息保留在 ActiveMQ 队列中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17062776/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com