gpt4 book ai didi

amqp - Spring 集成 MessageTransformationException 转换消息失败

转载 作者:行者123 更新时间:2023-12-04 04:34:22 27 4
gpt4 key购买 nike

我正在尝试使用 spring integration v2.2.6.RELEASE 运行这个 ( https://github.com/cloudfoundry-samples/wgrus/tree/master/spring-integration ) wgrus 示例应用程序并不断收到 MessageTransformationException。该应用程序分为 3 个组件 - 商店 UI、库存服务和运输服务(此服务与此处无关)并使用 AMQP 协议(protocol)将消息发送到 RabbitMQ。简而言之,您在 UI 中输入订单详细信息,然后以消息的形式将其发送到订单 channel 。在后台,应用程序执行声明检查,将消息存储在 Mongo 中(我可以看到它已存储)并返回一条新消息,其有效负载是存储消息的 ID - 到目前为止一切顺利,消息已存储我可以看到 amqpOut channel 返回存储消息的 ID,即 826bcbfb-21fa-424d-aecd-0bab3d1a690b - 这可以在我的问题底部显示的调试中看到。接下来,消息通过出站 AMQP channel 发送到 RabbitMQ,并且应该由库存服务拾取,此时抛出异常。我可以看到,在调试打印 Payload=???sr?java.util.UUID????m?/

时,存储消息的 ID 在某些时候发生了一些变化

有没有人遇到过这个问题并且知道如何解决它?

商店 UI 使用的配置:

<int:object-to-json-transformer input-channel="orderChannel" output-channel="jsonOrders" content-type="text/x-json"/>

<int:claim-check-in input-channel="jsonOrders" output-channel="amqpOut" message-store="messageStore"/>

<amqp:outbound-channel-adapter id="amqpOut" amqp-template="rabbitTemplate" routing-key="orders"/>
...

库存服务使用的配置:

<int:claim-check-out message-store="messageStore" input-channel="orderChannel" output-channel="inventoryChannel" remove-message="true" />

<amqp:inbound-channel-adapter channel="orderChannel"
connection-factory="rabbitConnectionFactory"
queue-names="orders"
error-channel="errorLogger" />

<int:logging-channel-adapter id="errorLogger" log-full-message="true" level="INFO"/>

<int:chain input-channel="inventoryChannel" output-channel="amqpOut">
<int:json-to-object-transformer type="org.wgrus.Order"/>
<int:enricher request-channel="creditCheck">
<int:property name="approved" expression="payload.startsWith('OK')"/>
</int:enricher>
<int:enricher request-channel="inventoryRouter">
<int:property name="reserved" expression="payload"/>
</int:enricher>
<int:object-to-json-transformer content-type="text/x-json" />
<int:claim-check-in/>
</int:chain>
...

调试:

DEBUG: http-bio-8080-exec-3 org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'amqpOut', message: [Payload=826bcbfb-21fa-424d-aecd-0bab3d1a690b][Headers={timestamp=1384617973920, id=e98cd0c1-bd8c-4786-be31-1e77b0200934, content-type=text/x-json}]

DEBUG: http-bio-8080-exec-3 org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'jsonOrders', message: [Payload={"id":1,"approved":false,"reserved":false,"customerId":"","quantity":1,"productId":"widget"}][Headers={timestamp=1384617973852, id=826bcbfb-21fa-424d-aecd-0bab3d1a690b, content-type=text/x-json}]

DEBUG: http-bio-8080-exec-3 org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'orderChannel', message: [Payload=Order #1: 1 widgets for ][Headers={timestamp=1384617973824, id=1b700a4b-35e1-4d16-8ca0-7cd20ccfb85e}]

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[content-type] WILL be mapped, matched pattern=content-type

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=amqp_receivedRoutingKey

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=amqp_deliveryMode

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_redelivered] WILL be mapped, matched pattern=amqp_redelivered

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=amqp_deliveryTag

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[content-type] WILL be mapped, matched pattern=content-type

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.channel.DirectChannel - preSend on channel 'orderChannel', message: [Payload=???sr?java.util.UUID????m?/?J?leastSigBitsJ?mostSigBitsxp???=i?k??!?][Headers={timestamp=1384617974260, id=81a2fb77-0f1e-4be7-9148-84da86a30ed8, content-type=text/x-json, amqp_receivedRoutingKey=orders, amqp_deliveryMode=PERSISTENT, amqp_redelivered=false, amqp_deliveryTag=1}]

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#1 received message: [Payload=???sr?java.util.UUID????m?/?J?

最佳答案

那么,您的问题就在这里:

<int:object-to-json-transformer input-channel="orderChannel" output-channel="jsonOrders" content-type="text/x-json"/>

你能解释一下设置content-type的原因是什么吗? , 如果你在那之后使用 <claim-check-in> ,谁刚刚返回UUID - 存储消息的 ID?

发生了什么事?

你的 UUIDSimpleMessageConverter 转换为序列化字节这个转换器集 contentType作为application/x-java-serialized-objectMessageProperties .但在那之后叫做AmqpHeaderMapper ,谁改变了contentType用你的值(value)text/x-json来自 MessageHeaders .这对 Producer 来说很好。

但 Consumer 无法正确转换 Body,因为这里也有效 SimpleMessageConverter默认。它会检查 contentType.startsWith("text") .只需从序列化的字节 UUID 创建简单的字符串.

希望一切都清楚

更新

不幸的是<object-to-json-transformer>content-type标题无论如何,到application/json默认。

为了防止它,你应该像这样配置content-type="" .

关于amqp - Spring 集成 MessageTransformationException 转换消息失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20021451/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com