gpt4 book ai didi

java - 动态生成具有入站 channel 和响应 channel 的 TCP 客户端

转载 作者:行者123 更新时间:2023-11-30 02:08:16 41 4
gpt4 key购买 nike

我是 Spring 集成的新手。

使用 Spring 4,仅使用 java 注解。

我现在工作的项目我们在属性文件中设置了 tcp 连接。

目前它被硬编码为只有 2 个不同的连接,并且必须更改为更动态的方法,我们可以在属性文件中设置可变数量的连接,并能够在运行时添加新连接。

我知道 dynamic tcp client example 的存在,并尝试将我的工作建立在它的基础上。

首先我们为连接设置以下 bean:

@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
env.getProperty("socket.tcp.nodes[0].ip"),
env.getProperty("socket.tcp.nodes[0].port", Integer.class)
);

tcpNetClientConnectionFactory.setSingleUse(false);
tcpNetClientConnectionFactory.setSoKeepAlive(true);

final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);

tcpNetClientConnectionFactory.setSerializer(by);
tcpNetClientConnectionFactory.setDeserializer(by);
return tcpNetClientConnectionFactory;
}

然后我们有等待发送内容的适配器:

@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
@Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
return adapter;
}

当调用 fromTcp() 时,它会转换消息,并且以下代码将其发送到另一个应用程序以进行进一步处理。

@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}

处理消息后,我们必须发送响应。

@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
return tcpSendingMsgHandler;
}

并使用网关:

@MessagingGateway()
public interface MessageTcpGateway {

@Gateway(requestChannel = "toTcpCh01")
ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}

我们将其退回。

通过示例,我可以了解如何动态创建响应流。

但我无法理解如何创建一个公共(public)连接池,然后根据这些连接工厂动态创建监听适配器和响应适配器,然后在运行时关闭/删除它们。

多亏了this question,我在某种程度上了解了如何使用入站适配器创建流程

我需要为每个适配器创建多个单独的 IntegrationFlow 吗?所以所有的调用和响应都可以异步处理(我对异步的看法可能是错误的)

然后在想要关闭连接时单独处理它们?就像调用 close 到 TcpReceivingChannelAdapter 然后调用 TcpSendingMessageHandler 最后取消注册 connectonfactory 一样?

最佳答案

我不这么认为Collaborating Channel Adapters您需要为 TcpReceivingChannelAdapterTcpSendingMessageHandler 单独定义 IntegrationFlow。它确实可以作为单个 IntegrationFlow 来完成,从 TcpReceivingChannelAdapter 开始,到 TcpSendingMessageHandler 结束。要点是,IntegrationFlow 本身只是一个对组件引用进行分组的逻辑容器。艰苦的工作实际上是由您在那里声明的所有组件完成的,并且通过这个 TcpReceivingChannelAdapterTcpSendingMessageHandler 以及之间的网关,您真的将实现异步。

请记住,ByteArrayLengthHeaderSerializer 也必须声明为 bean。不确定每个动态流是否需要一个单独的实例,但这里有一个 API 可以从那里执行此操作:

    /**
* Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
* application context. Usually it is some support component, which needs an application context.
* For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
* @param bean an additional arbitrary bean to register into the application context.
* @return the current builder instance
*/
IntegrationFlowRegistrationBuilder addBean(Object bean);

关于java - 动态生成具有入站 channel 和响应 channel 的 TCP 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50874121/

41 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com