gpt4 book ai didi

java - spring 集成中的自定义并发 TcpOutboundGateway

转载 作者:可可西里 更新时间:2023-11-01 02:53:52 46 4
gpt4 key购买 nike

我正在尝试使用 TcpOutboundGateway 和客户端 TcpConnectionFactory 与外部 TCP 服务器通信。在我的场景中,每个连接都应该与不同的线程相关联(线程上的每个连接可能用于多个请求/响应)。

所以我使用了这个主题中的 ThreadAffinityClientConnectionFactory:Spring Integration tcp client multiple connections

它工作正常,直到我尝试打开超过 4 个并发连接,第五个(及以上)连接因超时而失败。我发现 org.springframework.integration.ip.tcp.TcpOutboundGatewayhandleRequestMessage 方法中使用信号量来获取连接,所以我覆盖了 TcpOuboundGateway像这样:

public class NoSemaphoreTcpOutboundGateway extends TcpOutboundGateway {

private volatile AbstractClientConnectionFactory connectionFactory;
private final Map<String, NoSemaphoreTcpOutboundGateway.AsyncReply> pendingReplies = new ConcurrentHashMap();

@Override
public boolean onMessage(Message<?> message) {
String connectionId = (String)message.getHeaders().get("ip_connectionId");
if(connectionId == null) {
this.logger.error("Cannot correlate response - no connection id");
this.publishNoConnectionEvent(message, (String)null, "Cannot correlate response - no connection id");
return false;
}

if(this.logger.isTraceEnabled()) {
this.logger.trace("onMessage: " + connectionId + "(" + message + ")");
}

NoSemaphoreTcpOutboundGateway.AsyncReply reply = (NoSemaphoreTcpOutboundGateway.AsyncReply)this.pendingReplies.get(connectionId);
if(reply == null) {
if(message instanceof ErrorMessage) {
return false;
} else {
String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
this.logger.error(errorMessage);
this.publishNoConnectionEvent(message, connectionId, errorMessage);
return false;
}
} else {
reply.setReply(message);
return false;
}

}

@Override
protected Message handleRequestMessage(Message<?> requestMessage) {
connectionFactory = (AbstractClientConnectionFactory) this.getConnectionFactory();
Assert.notNull(this.getConnectionFactory(), this.getClass().getName() + " requires a client connection factory");

TcpConnection connection = null;
String connectionId = null;

Message var7;
try {
/*if(!this.isSingleUse()) {
this.logger.debug("trying semaphore");
if(!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
}

haveSemaphore = true;
if(this.logger.isDebugEnabled()) {
this.logger.debug("got semaphore");
}
}*/

connection = this.getConnectionFactory().getConnection();
NoSemaphoreTcpOutboundGateway.AsyncReply e = new NoSemaphoreTcpOutboundGateway.AsyncReply(10000);
connectionId = connection.getConnectionId();
this.pendingReplies.put(connectionId, e);
if(this.logger.isDebugEnabled()) {
this.logger.debug("Added pending reply " + connectionId);
}

connection.send(requestMessage);

//connection may be closed after send (in interceptor) if its disconnect message
if (!connection.isOpen())
return null;

Message replyMessage = e.getReply();
if(replyMessage == null) {
if(this.logger.isDebugEnabled()) {
this.logger.debug("Remote Timeout on " + connectionId);
}

this.connectionFactory.forceClose(connection);
throw new MessageTimeoutException(requestMessage, "Timed out waiting for response");
}

if(this.logger.isDebugEnabled()) {
this.logger.debug("Response " + replyMessage);
}

var7 = replyMessage;
} catch (Exception var11) {
this.logger.error("Tcp Gateway exception", var11);
if(var11 instanceof MessagingException) {
throw (MessagingException)var11;
}

throw new MessagingException("Failed to send or receive", var11);
} finally {
if(connectionId != null) {
this.pendingReplies.remove(connectionId);
if(this.logger.isDebugEnabled()) {
this.logger.debug("Removed pending reply " + connectionId);
}
}
}
return var7;
}

private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) {
ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
if(applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new TcpConnectionFailedCorrelationEvent(this, connectionId, new MessagingException(message, errorMessage)));
}
}

private final class AsyncReply {
private final CountDownLatch latch;
private final CountDownLatch secondChanceLatch;
private final long remoteTimeout;
private volatile Message<?> reply;

private AsyncReply(long remoteTimeout) {
this.latch = new CountDownLatch(1);
this.secondChanceLatch = new CountDownLatch(1);
this.remoteTimeout = remoteTimeout;
}

public Message<?> getReply() throws Exception {
try {
if(!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
return null;
}
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
}
for(boolean waitForMessageAfterError = true; this.reply instanceof ErrorMessage; waitForMessageAfterError = false) {
if(!waitForMessageAfterError) {
if(this.reply.getPayload() instanceof MessagingException) {
throw (MessagingException)this.reply.getPayload();
}

throw new MessagingException("Exception while awaiting reply", (Throwable)this.reply.getPayload());
}
NoSemaphoreTcpOutboundGateway.this.logger.debug("second chance");
this.secondChanceLatch.await(2L, TimeUnit.SECONDS);
}
return this.reply;
}

public void setReply(Message<?> reply) {
if(this.reply == null) {
this.reply = reply;
this.latch.countDown();
} else if(this.reply instanceof ErrorMessage) {
this.reply = reply;
this.secondChanceLatch.countDown();
}
}
}
}

SpringContext的配置是这样的:

@Configuration
@ImportResource("gateway.xml")
public class Conf {

@Bean
@Autowired
@ServiceActivator(inputChannel = "clientOutChannel")
public NoSemaphoreTcpOutboundGateway noSemaphoreTcpOutboundGateway(ThreadAffinityClientConnectionFactory cf, DirectChannel clientInChannel){
NoSemaphoreTcpOutboundGateway gw = new NoSemaphoreTcpOutboundGateway();
gw.setConnectionFactory(cf);
gw.setReplyChannel(clientInChannel);
gw.setRequestTimeout(10000);
return gw;
}

<int-ip:tcp-connection-factory
id="delegateCF"
type="client"
host="${remoteService.host}"
port="${remoteService.port}"
single-use="true"
lookup-host="false"
ssl-context-support="sslContext"
deserializer="clientDeserializer"
serializer="clientSerializer"
interceptor-factory-chain="clientLoggingTcpConnectionInterceptorFactory"
using-nio="false"/>

delegateCF 被传递给ThreadAffinityClientConnectionFactory 构造函数

那么,问题是:

  • 就并发性而言,可以将 NoSemaphoreTcpOutboundGatewayThreadAffinityClientConnectionFactory 结合使用吗?

最佳答案

看起来你走对了,但同时我认为你不需要自定义 TcpOutboundGateway信号量 逻辑基于:

if (!this.isSingleUse) {
logger.debug("trying semaphore");
if (!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
}

同时看看Gary对ThreadAffinityClientConnectionFactory的解决方案:

@Bean
public TcpNetClientConnectionFactory delegateCF() {
TcpNetClientConnectionFactory clientCF = new TcpNetClientConnectionFactory("localhost", 1234);
clientCF.setSingleUse(true); // so each thread gets his own connection
return clientCF;
}

@Bean
public ThreadAffinityClientConnectionFactory affinityCF() {
return new ThreadAffinityClientConnectionFactory(delegateCF());
}

关注评论。您只需要委托(delegate) isSingleUse()

关于java - spring 集成中的自定义并发 TcpOutboundGateway,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41166718/

46 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com