gpt4 book ai didi

java - Spring 集成使用 1 个 channel 用于两个不同的进程

转载 作者:行者123 更新时间:2023-12-04 08:08:29 25 4
gpt4 key购买 nike

我有一个问题,我无法设置并行进入两个 channel 的进程,它要么是一个,要么是另一个。我有一个 ServiceActivator和一个 Transformer具有相同输入 channel 的 objectOutputChannel .去后processChannel它应该去两个,但它只去其中一个,并且每个请求都不同。我该怎么做才能让它按照我想要的方式工作?

    @Bean
public TcpReceivingChannelAdapter channelAdapter(AbstractServerConnectionFactory connectionFactory) {
final MsgReceivingChannelAdapter adapter = new MsgReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setOutputChannel(messageChannel());
return adapter;
}

@Bean
@ServiceActivator(inputChannel = "outputChannel")
public TcpSendingMessageHandler messageHandler(AbstractServerConnectionFactory connectionFactory){
final MsgSendingMessageHandler handler = new MsgSendingMessageHandler();
handler.setConnectionFactory(connectionFactory);
return handler;
}

@Bean
@ServiceActivator(inputChannel = "objectOutputChannel")
public AmqpOutboundEndpoint objectOutboundEndpoint() {
return Amqp
.outboundAdapter(rabbitTemplate)
.exchangeName(objectExchange)
.get();
}

@Bean
@Transformer(inputChannel = "messageChannel", outputChannel = "loggingChannel")
public ObjectToStringTransformer loggingTransformer() {
return new ObjectToStringTransformer();
}

@ServiceActivator(inputChannel = "loggingChannel")
public void loggingService(String message) {
messageLogger.info(message);
}

@Bean
@Transformer(inputChannel = "messageChannel", outputChannel = "processChannel")
public ObjectDeserializer objectDeserializer() {
return new ObjectDeserializer();
}

@ServiceActivator(inputChannel = "processChannel", outputChannel = "objectOutputChannel")
public MyObject processService(MyObject object) {
return objectService.check(object);
}

@Bean
@Transformer(inputChannel = "objectOutputChannel", outputChannel = "outputChannel")
public ObjectSerializer objectSerializer() {
return new ObjectSerializer();
}

@Bean(name = "loggingChannel")
public MessageChannel loggingChannel() {
return new DirectChannel();
}

@Bean(name = "outputChannel")
public MessageChannel outputChannel() {
return new DirectChannel();
}

@Bean(name = "messageChannel")
public MessageChannel messageChannel() {
return new PublishSubscribeChannel();
}

@Bean(name = "objectOutputChannel")
public MessageChannel objectOutputChannel() {
return new PublishSubscribeChannel());
案例 A 的调试日志 - 不进入 objectOutboundEndpoint()然后进入 objectSerializer()
2021-02-09 11:23:40.221 DEBUG 8964 --- [pool-4-thread-2] o.s.i.i.tcp.connection.TcpNetConnection  : Message received GenericMessage [payload=...]
2021-02-09 11:23:40.223 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:23:40.231 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=..]
2021-02-09 11:23:42.383 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.385 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.objectSerializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : bean 'objectTcpController.messageHandler.serviceActivator.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.i.ip.tcp.TcpSendingMessageHandler : bean 'messageHandler'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@38499e48' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.389 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : handler 'bean 'objectTcpController.messageHandler.serviceActivator.handler'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
案例 B 的调试日志 - 不进入 objectSerializer()然后进入 objectOutboundEndpoint()
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.i.tcp.connection.TcpNetConnection  : Message received GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint : bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489' received message: GenericMessage [payload=...]
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_tcp_remotePort] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_connectionId] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_localInetAddress] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_address] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[history] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_hostname] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint : handler 'bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=..]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]

最佳答案

您所描述的不是关于 objectOutputChannelmessageChannel因为它们都是 PublishSubscribeChannel类实例。你所描述的是一些DirectChannel使用其循环默认调度策略。但是,由于我们没有几个订阅者直接订阅您向我们展示的 channel ,因此问题出在其他地方。
建议您开启Message History并调查 org.springframework.integration 的调试日志类别以查看您的消息如何在流中传播。在那里你可以找出哪个 channel 是有罪的。
您在代码片段中显示的内容是正确的,并且看起来与您描述的内容无关...
更新
这是我们在您的日志中看到的:

o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
那么,您的 objectOutputChannel真的是 DirectChannel实例及其 bean 在 ObjectIntegrationConfiguration 中声明.如果该配置不是您在问题中显示的内容,那么您有一个“bean 定义覆盖”竞争条件和您的 PublishSubscribeChannelDirectChannel 覆盖.请修改您的项目配置并尝试找到您声明的其他位置 objectOutputChannel bean 角,扁 bean 。

关于java - Spring 集成使用 1 个 channel 用于两个不同的进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66099659/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com