gpt4 book ai didi

error-handling - 未使用消息驱动 channel 适配器调用 Spring Integration 错误 channel

转载 作者:行者123 更新时间:2023-12-03 07:59:28 25 4
gpt4 key购买 nike

注意:由于我正在处理的项目的依赖要求,我正在使用 Spring Integration 3.0.5.RELEASE。

我已经配置了一个带有重新传递策略的连接工厂(我们使用的是 Active MQ 5.7)

<amq:connectionFactory id="cms.jmsConnectionFactory"
brokerURL="${cms.jms.brokerURL}" p:redeliveryPolicy-ref="cms.jms.redeliveryPolicy"
p:prefetchPolicy-ref="cms.jms.prefetchPolicy" p:userName="${cms.jms.userName}" p:password="${cms.jms.password}"/>


<amq:redeliveryPolicy id="cms.jms.redeliveryPolicy"
p:backOffMultiplier="5"
p:initialRedeliveryDelay="1000"
p:maximumRedeliveries="6"
p:redeliveryDelay="1000"
p:useExponentialBackOff="false"
p:useCollisionAvoidance="false" />

我有一个服务激活器,它通过消息驱动 channel 适配器上的消息激活
<int-jms:message-driven-channel-adapter channel="cms.jms.archiveNodeChannel"
connection-factory="cms.jms.cachedConnectionFactory" destination="cms.jms.archiveNodeDestination" transaction-manager="cms.jms.txManager"
acknowledge="transacted" concurrent-consumers="1" max-concurrent-consumers="1" cache-level="3"/>

<int:service-activator ref="cms.int.nodeArchiver" method="archiveNode" input-channel="cms.jms.archiveNodeChannel"/>

在某些情况下,服务激活器可能会抛出 RuntimeException。我想要的行为是根据重新传递策略上的 maximumRedeliveries 属性重新传递消息 6 次,然后在 Spring 集成层调用错误处理程序,但是当抛出 RuntimeException 时实际发生了什么是消息会不断地重新传递,直到我终止应用程序。我尝试向消息驱动 channel 适配器添加错误 channel ,但这从未激活。

This post有点旧,但建议在 JMS 消息驱动的场景中。 DefaultMessageListenerContainer 是“调用者”,并且只记录了一个 RuntimeException(在 AbstractMessageListenerContainer 基类中)。

可能是我必须扩展 DefaultMessageListenerContainer 以不同方式处理 RuntimeExceptions 但我想知道是否有人可以指出我更优雅的解决方案或告诉我我的方法完全错误。

--- JMS 日志记录(我已经缩进以使其更具可读性)。日志显示从第一个错误“No file type were found for file 9789814316385.txt”到下一次相同错误的详细信息(此跟踪无限重复,直到系统关闭。)确实 redeliveryCounter 始终为 0 我不确定为什么会这样
ERROR  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] No file type was found for file 9789814316385.txt
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Initiating transaction commit
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:6,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Creating new transaction with name
[org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Created JMS transaction on Session
[Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:6,started=true}]
from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1]
org.springframework.integration.jms.JmsSendingMessageHandler#4 received message: [Payload=]
[Headers={sequenceNumber=1, sequenceSize=1, jms_timestamp=1437721454735, , jms_redelivered=false, ,
jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:818, timestamp=1437721454738}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Executing callback on JMS Session: Cached JMS Session:
ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Sending created message: ActiveMQTextMessage
{commandId = 0, responseRequired = false, messageId = null, originalDestination = null,
originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0,
properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454738}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = }
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] postSend (sent=true) on channel 'cms.jms.archiveNodeChannel', message: [Payload=]
[Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454735, , jms_redelivered=false, ,
jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:818, timestamp=1437721454738}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Initiating transaction commit
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Creating new transaction with name
[org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Created JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage]
from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:simac.home-56110-1437721286921-0:1:2:1, started=true }] of session
[Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:2,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] converted JMS Message [ActiveMQTextMessage
{commandId = 4953, responseRequired = false, messageId = ID:simac.home-56110-1437721286921-0:1:7:1:819, originalDestination = null,
originalTransactionId = null, producerId = ID:simac.home-56110-1437721286921-0:1:7:1, destination = queue://cms.jms.queue.archiveNode
transactionId = TX:ID:simac.home-56110-1437721286921-0:1:1644, expiration = 0, timestamp = 1437721454772,
arrival = 0, brokerInTime = 1437721454773, brokerOutTime = 1437721454774, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@3ee1137b,
dataStructure = null, redeliveryCounter = 0, size = 0, properties =
{sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454738}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = }]
to integration Message payload []
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] preSend on channel 'cms.jms.archiveNodeChannel',
message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, ,
jms_redelivered=false, , , id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, ,
, jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] org.springframework.integration.jms.JmsSendingMessageHandler#4 received message:
[Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , jms_redelivered=false, , ,
id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Executing callback on JMS Session: Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Sending created message: ActiveMQTextMessage
{commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null,
destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null,
redeliveryCounter = 0, size = 0, properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454775},
readOnlyProperties = false, readOnlyBody = false, droppable = false, text = }
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] postSend (sent=true) on channel 'cms.jms.archiveNodeChannel', message:
[Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , jms_redelivered=false, , ,
id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Initiating transaction commit
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Creating new transaction with name
[org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Created JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,
clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage]
from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:simac.home-56110-1437721286921-0:1:2:1, started=true }]
of session [Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:2,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] converted JMS Message [ActiveMQTextMessage
{commandId = 4959, responseRequired = false, messageId = ID:simac.home-56110-1437721286921-0:1:7:1:820, originalDestination = null,
originalTransactionId = null, producerId = ID:simac.home-56110-1437721286921-0:1:7:1, destination = queue://cms.jms.queue.archiveNode,
transactionId = TX:ID:simac.home-56110-1437721286921-0:1:1646, expiration = 0, timestamp = 1437721454775, arrival = 0, brokerInTime = 1437721454776,
brokerOutTime = 1437721454777, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@214372df,
dataStructure = null, redeliveryCounter = 0, size = 0, properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454775},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text = }] to integration Message payload []
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] preSend on channel 'cms.jms.archiveNodeChannel',
message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454775, , jms_redelivered=false, , ,
id=22069a85-30c5-5478-1b6c-10b6264575b1, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:820, timestamp=1437721454778}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] ServiceActivator for
[org.springframework.integration.handler.MethodInvokingMessageProcessor@755cd5ee] received message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1,
, , jms_timestamp=1437721454775, , jms_redelivered=false, , , id=22069a85-30c5-5478-1b6c-10b6264575b1, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:820,
timestamp=1437721454778}]
ERROR [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] No file type was found for file 9789814316385.txt

附加调试器后,我可以看到服务激活器引发的异常未传播的原因:

问题出现在 org.springframework.integration.dispatcher.UnicastingDispatcher
private boolean doDispatch(Message<?> message) {
boolean success = false;
Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
if (!handlerIterator.hasNext()) {
throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
}
List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
while (success == false && handlerIterator.hasNext()) {
MessageHandler handler = handlerIterator.next();
try {
handler.handleMessage(message);
success = true; // we have a winner.
}
catch (Exception e) {
RuntimeException runtimeException = (e instanceof RuntimeException)
? (RuntimeException) e
: new MessageDeliveryException(message,
"Dispatcher failed to deliver Message.", e);
if (e instanceof MessagingException &&
((MessagingException) e).getFailedMessage() == null) {
((MessagingException) e).setFailedMessage(message);
}
exceptions.add(runtimeException);
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
}
}
return success;
}

上面显示的 doDispatch 循环遍历 Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message); 返回的每个 MessageHander如果一个人成功处理了消息,就会中断。

handlerIterator 包含 2 个 MessageHandler,第一个是我的 Service Activator(它会引发 RuntimeException),第二个是 org.springframework.integration.jms.JmsSendingMessageHandler(我不知道为什么会在这里)

当服务激活器抛出异常时,它被传递给如下所示的 handleExceptions 方法:
/**
* Handles Exceptions that occur while dispatching. If this dispatcher has
* failover enabled, it will only throw an Exception when the handler list
* is exhausted. The 'isLast' flag will be <em>true</em> if the
* Exception occurred during the final iteration of the MessageHandlers.
* If failover is disabled for this dispatcher, it will re-throw any
* Exception immediately.
*/
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
if (isLast || !this.failover) {
if (allExceptions != null && allExceptions.size() == 1) {
throw allExceptions.get(0);
}
throw new AggregateMessageDeliveryException(message,
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
}
}

因为服务激活器不是最后一个处理程序 (isLast = false),所以不会引发异常。第二个 JmsSendingMessageHandler 处理消息(虽然我不知道为什么)并且消息只是不断地重新传递。有谁知道为什么这个 JmsSendingMessageHandler 还试图处理为服务激活器指定的消息?

最佳答案

<publish-subscribe-channel>是给你的!你真的可以将所有这些组件添加为订阅它,它们将在 order 中调用。它们是如何在配置中声明的或根据它们预先配置的order属性。

关于error-handling - 未使用消息驱动 channel 适配器调用 Spring Integration 错误 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31596079/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com