gpt4 book ai didi

tomcat - 将 Tomcat 与 ActiveMQ 和 Spring 的 DefaultMessageListenerContainer 集成 - 再次重新传递 JMS 消息

转载 作者:行者123 更新时间:2023-11-28 22:27:23 25 4
gpt4 key购买 nike

我有以下设置:带有嵌入式 ActiveMQ 的 Tomcat

我使用 Spring 集成的 JmsMessageDrivenChannelAdapter 创建适配器,它使用来自 ActiveMQ 队列的消息,如下所示:

 Jms.messageDrivenChannelAdapter(jmsConnectionFactory, TransactedMessageListenerContainer.class)
.destination(destination)
.errorChannel(errorChannel)
.get();

TransactedMessageListenerContainer 在哪里

public class TransactedMessageListenerContainer extends DefaultMessageListenerContainer {

public TransactedMessageListenerContainer() {
this.setSessionTransacted(true);
}
}

如果发生异常,ActiveMQ broker 不会相应地重新传递消息。

当我使用 org.apache.activemq.broker.BrokerService 进行简单的集成测试时,JMS 消息被重新传递,即我可以实现重试机制

如何使用 ActiveMQ 为 Tomcat 实现同样的功能?

我在这里找到:http://activemq.apache.org/tomcat.html ,手动将 ActiveMQ 与 Tomcat 集成确实允许主题、队列和 ConnectionFactory 注入(inject),但不支持事务性发送和传递,但我不确定是否有一些解决方法

感谢您的帮助!

更新:我还在错误处理程序中重新抛出异常,如下所示:

@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(this::errorMessageHandler)
.get();
}

public void errorMessageHandler(Message<?> message) {
log.warn("handling error message");
log.warn("headers: " + message.getHeaders().toString());
log.warn("payload: " + message.getPayload().toString());
MessagingException exception = (MessagingException) message.getPayload();
log.warn("original payload: " + exception.getFailedMessage().getPayload());
throw exception; // make JMS broker redeliver
}

最佳答案

如果您想要回滚和重新传递,您的错误流必须重新抛出异常,而不是像默认 errorChannel 那样吞没。你可以在这里找到类似的问题和答案。

更新

嗯,不确定你的问题在哪里我有一个类似于你的测试用例:

    @Autowired
private MessageChannel errorChannel;

@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.jmsConnectionFactory)
.configureListenerContainer(c -> c.sessionTransacted(true))
.errorChannel(this.errorChannel)
.destination("jmsMessageDriver"))
.<String, String>transform(p -> {
throw new RuntimeException("intentional");
})
.get();
}

@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(m -> {
MessagingException exception = (MessagingException) m.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
throw exception;
})
.get();
}

在重新投递时,我在 failedMessage 中看到了这些 header :

"jms_redelivered" -> "true"
"JMSXDeliveryCount" -> "2"

是的,我们肯定会在测试中使用嵌入式 ActiveMQ。顺便说一句,JMS ConnectionFactory 是为 Spring Boot ActiveMQAutoConfiguration 提供的。

关于tomcat - 将 Tomcat 与 ActiveMQ 和 Spring 的 DefaultMessageListenerContainer 集成 - 再次重新传递 JMS 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37434573/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com