gpt4 book ai didi

error-handling - Spring 与 AMQP 集成 : get original message's custom-headers on error-channel

转载 作者:行者123 更新时间:2023-12-03 08:13:03 34 4
gpt4 key购买 nike

在我的项目中使用 Spring Integration with RabbitMQ 我遇到了一个问题。
该项目包括从队列接收消息、跟踪传入消息、使用服务激活器处理消息以及跟踪服务激活器引发的响应或异常。
这是示例配置:

<!-- inbound-gateway -->
<int-amqp:inbound-gateway id="inboundGateway"
request-channel="gatewayRequestChannel"
queue-names="myQueue"
connection-factory="rabbitMQConnectionFactory"
reply-channel="gatewayResponseChannel"
error-channel="gatewayErrorChannel"
error-handler="rabbitMQErrorHandler"
mapped-request-headers="traceId"
mapped-reply-headers="traceId" />

<!-- section to dispatch incoming messages to trace and execute service-activator -->
<int:publish-subscribe-channel id="gatewayRequestChannel" />
<int:bridge input-channel="gatewayRequestChannel" output-channel="traceChannel"/>
<int:bridge input-channel="gatewayRequestChannel" output-channel="serviceActivatorInputChannel"/>

<!-- the trace channel-->
<int:logging-channel-adapter id="traceChannel"
expression="headers['traceId'] + '= [Headers=' + headers + ', Payload=' + payload+']'" logger-name="my.logger" level="DEBUG" />

<!-- service activator which may throw an exception -->
<int:service-activator ref="myBean" method="myMethod" input-channel="serviceActivatorInputChannel" output-channel="serviceActivatorOutputChannel"/>

<!-- section to dispatch output-messages from service-activator to trace them and return them to the gateway -->
<int:publish-subscribe-channel id="serviceActivatorOutputChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
output-channel="traceChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
output-channel="gatewayResponseChannel" />

<!-- section to dispatch exceptions from service-activator to trace them and return them to the gateway -->
<int:bridge input-channel="gatewayErrorChannel"
output-channel="traceChannel" />
<int:bridge input-channel="gatewayErrorChannel"
output-channel="gatewayResponseChannel" />

我简化了代码以适合我的解释。这个想法是跟踪进出服务激活器的输入和输出/错误消息。为此,我使用了一个名为 traceId 的消息头。此标识符用作相关标识符,以便能够将请求消息与其响应相关联(这两个消息共享相同的 traceId 值)。

当服务激活器没有抛出异常时,一切正常。
但是当抛出异常时,网关似乎生成了一条新消息,没有我原来的 traceId header 。

稍微研究一下网关代码,我在 org.springframework.integration.gateway.MessagingGatewaySupport 类中找到了以下代码:
private Object doSendAndReceive(Object object, boolean shouldConvert) {
...
if (error != null) {
if (this.errorChannel != null) {
Message<?> errorMessage = new ErrorMessage(error);
Message<?> errorFlowReply = null;
try {
errorFlowReply = this.messagingTemplate.sendAndReceive(this.errorChannel, errorMessage);
}
...
}

似乎,当发生异常时,会创建一条新消息,并将异常消息作为有效负载并发送到网关的 errorChannel。这是我松开自定义标题的地方。

发生异常时,有没有办法保留我的自定义 header ? (也许有一种配置它的方法,我可能会错过它......)。或者,也许我没有以正确的方式实现我的流程。如果是这种情况,欢迎提出任何意见或建议。

顺便说一句,我使用的是 spring-integration-core 神器的 4.0.3.RELEASE 版本。

谢谢你的回答

编辑:正如 Gary Russel 所说,这个例子缺少以下推送/订阅队列配置
<int:publish-subscribe-channel id="gatewayErrorChannel"/>

最佳答案

error-channel上的消息是 ErrorMessage .它有两个属性:cause - 原始异常和failedMessage - 故障点的消息。 ErrorMessage没有得到 failedMessage的标题。

您不能只发送 ErrorMessage无需额外工作即可返回网关。

通常,错误流会在返回响应之前对错误进行一些分析。

如果要恢复某些自定义 header ,则需要在错误流上使用 header 丰富器。

就像是

<int:header-enricher ...>
<int:header name="traceId" expression="payload.failedMessage.headers.traceId" />
</int:header-enricher>

此外,您的配置有点奇怪,因为您在 gatewayErrorChannel 上有 2 个订阅者。 .除非是 <publish-subscribe-channel/> ,这些消费者将获得备用消息;似乎您希望他们都得到它,因此您需要正确声明 channel 。

关于error-handling - Spring 与 AMQP 集成 : get original message's custom-headers on error-channel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27852482/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com