gpt4 book ai didi

java - 如何在 'main' 线程上运行 DefaultMessageListenerContainer

转载 作者:行者123 更新时间:2023-11-30 11:45:06 24 4
gpt4 key购买 nike

我有一个案例,我想在同一个“主”线程中运行 DefaultMessageListenerContainer。现在它使用 SimpleAsyncTaskExecutor 每次收到消息时都会生成新线程。

我们有一个连接到不同分布式系统并进行处理的测试用例,最后它断言了一些东西。由于 DefaultMessageListenerContainer 在单独的线程中运行,主线程返回并在 DefaultMessageListenerContainer 完成之前开始执行断言。这会导致测试用例失败。作为解决方法,我们让主线程 hibernate 了几秒钟。

示例配置

  <int-jms:message-driven-channel-adapter
id="mq.txbus.publisher.channel.adapter"
container="defaultMessageListenerContainer"
channel="inbound.endpoint.publisher"
acknowledge="transacted"
extract-payload="true" />

<beans:bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<beans:property name="connectionFactory" ref="mockConnectionFactory"/>
<beans:property name="destination" ref="publisherToTxmQueue"/>
<beans:property name="taskExecutor" ref="taskExecutor"/>
<beans:property name="maxMessagesPerTask" value="10"/>
<beans:property name="sessionTransacted" value="true"/>
</beans:bean>

<beans:bean id="taskExecutor" class="org.springframework.scheduling.timer.TimerTaskExecutor" />

我在这里尝试使用 TimerTaskExecutor,因为它创建了单线程,但该线程与主线程是分开的,因此问题未解决。我尝试使用 SyncTaskExecutor 但这也不起作用(或者我可能提供了正确的属性值?)。

答案:
我们使用 SimpleMessageListenerContainer 解决了这个问题。这是新配置

     <int-jms:message-driven-channel-adapter
id="mq.txbus.publisher.channel.adapter"
container="messageListenerContainer"
channel="inbound.endpoint.publisher"
acknowledge="transacted"
extract-payload="true" />

<beans:bean id="messageListenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<beans:property name="connectionFactory" ref="mockConnectionFactory"/>
<beans:property name="destination" ref="publisherToTxmQueue"/>
<beans:property name="sessionTransacted" value="true"/>
<beans:property name="exposeListenerSession" value="false"/>
</beans:bean>

最佳答案

首先你必须明白本质上是异步的,不会阻塞。这意味着一旦您将消息发送到队列,它将由另一个线程处理,可能在不同的机器上,如果消费者关闭,可能会在几分钟或几小时后处理。

阅读您的测试用例描述,您似乎正在进行一些系统/集成测试。不幸的是,除了等待你无能为力,但是你不应该盲目地等待,因为这会使你的测试变慢但也不是很稳定 - 无论你等待多长时间,在繁忙的系统或一些冗长的 GC 过程中你的测试可能仍然需要时间out 即使没有错误。

因此,与其 hibernate 固定的秒数,不如 hibernate 一段时间。 ~100 毫秒并检查一些仅在消息处理完成后才满足的条件。例如,如果处理消息向数据库中插入一些记录,则定期检查数据库。

更优雅的方式(无需忙等待)是实现 request/repply pattern , 请参阅 How should I implement request response with JMS?有关实现细节。基本上在发送消息时,您定义一个回复队列并阻止等待该队列中的消息。原始消息处理完成后,消费者应向定义的队列发送回复消息。当您收到该消息时 - 执行所有断言。

关于java - 如何在 'main' 线程上运行 DefaultMessageListenerContainer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10479547/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com