gpt4 book ai didi

spring-integration - Spring 集成 : how to handle exceptions in services after an aggregator?

转载 作者:行者123 更新时间:2023-12-04 07:54:58 26 4
gpt4 key购买 nike

我有一个依赖 Spring Integration (4.0.4.RELEASE) 和 RabbitMQ 的应用程序。我的流程如下:

消息通过一个进程放入队列中(它们不期望得到任何答复):
网关 -> channel -> RabbitMQ

然后被另一个进程抽干:

RabbitMQ --1--> inbound-channel-adapter A --2--> chain B --3--> aggregator C --4--> service-activator D --5--> final service-activator E

Explanations & context

The specific thing is that nowhere in my application I am using a splitter: aggregator C just waits for enough messages to come, or for a timeout to expire, and then forwards the batch to service D. Messages can get stuck in aggregator C for quite a long time, and should NOT be considered as consumed there. They should only be consumed once service D successfully completes. Therefore, I am using MANUAL acknowledgement on inbound-channel-adapter A and service E is in charge of acknowledging the batch.

Custom aggregator

I solved the acknowledgement issue I had when set to AUTO by redefining the aggregator. Indeed, messages are acknowledged immediately if any asynchronous process occurs in the flow (see question here). Therefore, I switched to MANUAL acknowledgement and implemented the aggregator like this:

     <bean class="org.springframework.integration.config.ConsumerEndpointFactoryBean">
<property name="inputChannel" ref="channel3"/>
<property name="handler">
<bean class="org.springframework.integration.aggregator.AggregatingMessageHandler">
<constructor-arg name="processor">
<bean class="com.test.AMQPAggregator"/>
</constructor-arg>
<property name="correlationStrategy">
<bean class="com.test.AggregatorDefaultCorrelationStrategy" />
</property>
<property name="releaseStrategy">
<bean class="com.test.AggregatorMongoReleaseStrategy" />
</property>
<property name="messageStore" ref="messageStoreBean"/>
<property name="expireGroupsUponCompletion" value="true"/>
<property name="sendPartialResultOnExpiry" value="true"/>
<property name="outputChannel" ref="channel4"/>
</bean>
</property>
</bean>

<bean id="messageStoreBean" class="org.springframework.integration.store.SimpleMessageStore"/>

<bean id="messageStoreReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore" />
<property name="timeout" value="${myapp.timeout}" />
</bean>

<task:scheduled-tasks>
<task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" />
</task:scheduled-tasks>

我确实想以不同的方式聚合 header ,并保留所有 amqp_deliveryTag 的最高值,以便以后在 中进行多重确认。服务 E (见 this 线程)。到目前为止,这非常有效,除了它比典型的聚合器命名空间(请参阅 this 旧 Jira 票证)冗长得多。

服务

我只是使用基本配置:

链条-B
<int:chain input-channel="channel2" output-channel="channel3">
<int:header-enricher>
<int:error-channel ref="errorChannel" /> // Probably useless
</int:header-enricher>
<int:json-to-object-transformer/>
<int:transformer ref="serviceABean"
method="doThis" />
<int:transformer ref="serviceBBean"
method="doThat" />
</int:chain>

服务-D
<int:service-activator  ref="serviceDBean"
method="doSomething"
input-channel="channel4"
output-channel="channel5" />

错误管理

由于我依赖 MANUAL 确认,因此我还需要手动拒绝消息,以防发生异常。我对 有以下定义入站 channel 适配器 A :
<int-amqp:inbound-channel-adapter   channel="channel2"
queue-names="si.queue1"
error-channel="errorChannel"
mapped-request-headers="*"
acknowledge-mode="MANUAL"
prefetch-count="${properties.prefetch_count}"
connection-factory="rabbitConnectionFactory"/>

我对 使用以下定义错误 channel :
<int:chain input-channel="errorChannel">
<int:transformer ref="errorUnwrapperBean" method="unwrap" />
<int:service-activator ref="amqpAcknowledgerBean" method="rejectMessage" />
</int:chain>

ErrorUnwrapper 基于 this代码和整个异常检测和消息拒绝运行良好,直到消息到达 聚合器 C .

问题

如果在处理 中的消息时引发异常服务激活剂 D ,然后我看到了这个异常,但是 错误 channel 似乎没有收到任何消息,也没有调用我的 ErrorUnwrapper unwrap() 方法。我在抛出 Exception("ahahah") 时看到的定制堆栈跟踪如下:
2014-09-23 16:41:18,725 ERROR o.s.i.s.SimpleMessageStore:174: Exception in expiry callback
org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:170)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
(...)

Caused by: java.lang.Exception: ahahaha
at com.myapp.ServiceD.doSomething(ServiceD.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
(...)

2014-09-23 16:41:18,733 ERROR o.s.s.s.TaskUtils$LoggingErrorHandler:95: Unexpected error occurred in scheduled task.
org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha
(...)



如何告诉处理来自此类聚合器的消息的服务将错误发布到 错误 channel ?我试图通过 header-enricher 在 header 中指定错误 channel ,但没有运气。我使用的是默认 错误 channel 定义,但我也尝试更改其名称并重新定义它。我在这里一无所知,即使我找到了 thisthat ,我还没有设法让它工作。在此先感谢您的帮助!

最佳答案

正如您通过 StackTrace 看到的,您的进程是从 MessageGroupStoreReaper 开始的。线程,从默认 ThreadPoolTaskScheduler 启动.

因此,您必须为此提供一个自定义 bean:

<bean id="scheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="errorHandler">
<bean class="org.springframework.integration.channel.MessagePublishingErrorHandler">
<property name="defaultErrorChannel" ref="errorChannel"/>
</bean>
</property>
</bean>

<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" />
</task:scheduled-tasks>

但是我看到了拥有 error-channel 的好处在 <aggregator> ,我们真的有几个点来自不同的分离线程,我们无法正常处理。

关于spring-integration - Spring 集成 : how to handle exceptions in services after an aggregator?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25999096/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com