gpt4 book ai didi

java - 如何使用 Spring Cloud Stream 应用程序启动器 TCP 处理消息

转载 作者:行者123 更新时间:2023-12-01 08:51:45 26 4
gpt4 key购买 nike

我想使用 Spring Cloud Stream App Starter TCP Source project (maven 工件)以便能够通过套接字/端口接收 TCP 消息,处理它们,然后将结果推送到消息代理(例如 RabbitMQ)。

这个 TCP 源项目似乎完全符合我的要求,但它会自动将收到的消息发送到输出 channel 。那么,是否有一种干净的方法仍然使用 TCP 源项目,但拦截 TCP 传入消息以在内部转换它们,然后将它们输出到我的消息代理?

最佳答案

参见aggregation .

您使用源和处理器创建聚合应用。

Spring Cloud Stream provides support for aggregating multiple applications together, connecting their input and output channels directly and avoiding the additional cost of exchanging messages via a broker. As of version 1.0 of Spring Cloud Stream, aggregation is supported only for the following types of applications:

sources, sinks, processors ...

They can be aggregated together by creating a sequence of interconnected applications, in which the output channel of an element in the sequence is connected to the input channel of the next element, if it exists. A sequence can start with either a source or a processor, it can contain an arbitrary number of processors and must end with either a processor or a sink.

编辑

作为解决源 Autowiring 问题的方法,您可以尝试...

@EnableBinding(Source.class)
@EnableConfigurationProperties(TcpSourceProperties.class)
public class MyTcpSourceConfiguration {

@Autowired
private Source channels;

@Autowired
private TcpSourceProperties properties;

@Bean
public TcpReceivingChannelAdapter adapter(
@Qualifier("tcpSourceConnectionFactory") AbstractConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setOutputChannelName("toMyProcessor");
return adapter;
}

@ServiceActivator(inputChannel = "toMyProcessor", outputChannel = Source.OUTPUT)
public byte[] myProcessor(byte[] fromTcp) {
...
}

@Bean
public TcpConnectionFactoryFactoryBean tcpSourceConnectionFactory(
@Qualifier("tcpSourceDecoder") AbstractByteArraySerializer decoder) throws Exception {
TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
factoryBean.setType("server");
factoryBean.setPort(this.properties.getPort());
factoryBean.setUsingNio(this.properties.isNio());
factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers());
factoryBean.setLookupHost(this.properties.isReverseLookup());
factoryBean.setDeserializer(decoder);
factoryBean.setSoTimeout(this.properties.getSocketTimeout());
return factoryBean;
}

@Bean
public EncoderDecoderFactoryBean tcpSourceDecoder() {
EncoderDecoderFactoryBean factoryBean = new EncoderDecoderFactoryBean(this.properties.getDecoder());
factoryBean.setMaxMessageSize(this.properties.getBufferSize());
return factoryBean;
}

}

关于java - 如何使用 Spring Cloud Stream 应用程序启动器 TCP 处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42346872/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com