gpt4 book ai didi

java - 骡子 ESB 3.4.0 CE : Redeliver message through TCP Endpoint upon Socket Exception

转载 作者:可可西里 更新时间:2023-11-01 02:43:40 27 4
gpt4 key购买 nike

我有一个需要持久 TCP 连接的应用程序。所以我在 Tcp 连接器上设置了一些适当的属性来支持它。我遇到的问题是,如果外部应用程序关闭并重新启动,Mule 将继续尝试将消息发送到套接字,而不是尝试重新建立连接。

<mule ...>
<tcp:connector name="tcpConn" doc:name="TCP connector" keepAlive="true" keepSendSocketOpen="true" reuseAddress="true" validateConnections="true">
<reconnect-forever frequency="2000" />
<tcp:direct-protocol payloadOnly="true"/>
</tcp:connector>
<flow name="ExampleFlow1" doc:name="ExampleFlow1">
<vm:inbound-endpoint exchange-pattern="one-way" path="msg.in" doc:name="VM">
<vm:transaction action="ALWAYS_BEGIN"/>
</vm:inbound-endpoint>
<tcp:outbound-endpoint exchange-pattern="one-way" host="127.0.0.1" port="50002" responseTimeout="10000" doc:name="TCP"/>
</flow>

<flow name="example1" doc:name="example1">
<tcp:inbound-endpoint exchange-pattern="one-way" host="0.0.0.0" port="50001" responseTimeout="10000" doc:name="TCP"/>
<vm:outbound-endpoint exchange-pattern="one-way" path="msg.in" doc:name="VM">
</vm:outbound-endpoint>
</flow>
</mule>

这是我的错误日志:

ERROR 2014-04-30 17:11:35,436 [[chatroomexample].connector.VM.mule.default.receiver.04] org.mule.exception.DefaultMessagingExceptionStrategy: 
********************************************************************************
Message : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=tcp://127.0.0.1:50002, connector=TcpConnector
{
name=tcpConn
lifecycle=start
this=198bdbc
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[tcp]
serviceOverrides=<none>
}
, name='endpoint.tcp.127.0.0.1.50002', mep=ONE_WAY, properties={}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: byte[]
Type : org.mule.api.transport.DispatchException
Code : MULE_ERROR--2
Payload : [B@7cfbc3
JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
********************************************************************************
Exception stack is:
1. Connection reset by peer: socket write error (java.net.SocketException)
java.net.SocketOutputStream:-2 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=tcp://127.0.0.1:50002, connector=TcpConnector
{
name=tcpConn
lifecycle=start
this=198bdbc
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[tcp]
serviceOverrides=<none>
}
, name='endpoint.tcp.127.0.0.1.50002', mep=ONE_WAY, properties={}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: byte[] (org.mule.api.transport.DispatchException)
org.mule.transport.AbstractMessageDispatcher:109 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
at java.io.BufferedOutputStream.flush(Unknown Source)
at org.mule.transport.tcp.TcpMessageDispatcher.write(TcpMessageDispatcher.java:129)
at org.mule.transport.tcp.TcpMessageDispatcher.dispatchToSocket(TcpMessageDispatcher.java:122)
at org.mule.transport.tcp.TcpMessageDispatcher.doDispatch(TcpMessageDispatcher.java:50)
at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:99)
at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2627)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:101)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.endpoint.outbound.OutboundResponsePropertiesMessageProcessor.process(OutboundResponsePropertiesMessageProcessor.java:39)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47)
at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:50)
at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:47)
at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:20)
at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:58)
at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:48)
at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:54)
at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:44)
at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:44)
at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:52)
at org.mule.execution.TransactionalExecutionTemplate.execute(TransactionalExecutionTemplate.java:69)
at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor.process(EndpointTransactionalInterceptingMessageProcessor.java:56)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)
at org.mule.execution.Messa...
********************************************************************************

思路是这样的:

  1. 一个简单的 TCP 应用程序,它在端口 50001 连接到 Mule ESB 并发送一条 hello world 消息。
  2. Mule ESB 接收消息并将其放入 VM 队列中。 VM 队列是因为我希望实现一个可靠性消息传递模式,如 Mule in Action,第 2 版中所讨论的那样。
  3. 我们将消息从队列中弹出并以事务方式将其发送到位于 127.0.0.1:50002 的出站端点,我在此处使用 netcat 类型的应用程序来简单地监听和回显消息。我设置交易是因为我需要确保消息已送达。
  4. 我们第一次启动所有应用程序时,消息通过。太棒了!
  5. 我重新启动 netcat 应用程序(从第 3 步开始)。
  6. 我在端口 50001 向 Mule ESB 发送了另一条消息,现在我得到了上述异常。

这是我尝试过的:

  1. 在 TCP 连接器中设置重新连接策略,但我不理解它或者它没有执行它应该执行的操作,即尝试重新连接到外部 TCP 应用程序。
  2. 使用 Until Successful 处理范围。我们重新尝试传递,直到重试次数用完,但我没有观察到任何重新连接到外部 TCP 应用程序的尝试。我们一直在向破损的管道写入数据。
  3. 没有交易。这会产生在打印堆栈跟踪后丢弃消息的预期结果。

此功能是否支持开箱即用,或者我需要说,实现自定义 RetryPolicy 和 RetryPolicyTemplate 以应用于我的 TCP 连接器?如果我需要实现一个自定义的,我将如何获得损坏的套接字并重新建立连接?

TIA!

最佳答案

我解决了这个问题...有点...通过创建我自己的自定义 TcpConnector、TcpMessageDispatcher 和 TcpMessageDispatcherFactory 类,根据 Ian Gil Ragudo's suggestion

下面是我定义连接器的方式:

    <spring:bean id="dispatcherFactory" class="com.mycompany.mule.tcp.MyTcpMessageDispatcherFactory"/>
<spring:bean id="protocol" class="org.mule.transport.tcp.protocols.DirectProtocol"/>
<custom-connector name="myTcpConn" class="com.mycompany.mule.tcp.MyTcpConnector" >
<spring:property name="dispatcherFactory" ref="dispatcherFactory" />
<spring:property name="tcpProtocol" ref="protocol"/>
<spring:property name="keepAlive" value="true"/>
<spring:property name="keepSendSocketOpen" value="true"/>
<spring:property name="reuseAddress" value="true"/>
</custom-connector>

<tcp:endpoint connector-ref="myTcpConn" exchange-pattern="one-way" host="0.0.0.0" port="8081" name="TcpEndpoint" responseTimeout="10000" doc:name="TCP"/>

不幸的是,由于方法和实例变量的可见性,我不得不决定复制+粘贴类,更新强制转换,并且只在 MyTcpMessageDispatcher 类中编辑了这一点:

    @Override
protected synchronized void doDispatch(MuleEvent event) throws Exception
{
Socket socket = connector.getSocket(endpoint);
try
{
dispatchToSocket(socket, event);
}
catch (SocketException e)
{
System.err.println(e.toString());
socket.close();
throw e;
}
finally
{
connector.releaseSocket(socket, endpoint);
}
}

已知问题:当出站端点的外部应用程序出现故障,然后重新启动时,我似乎丢失了一条消息,即使配置了事务。在抛出套接字写入异常之前,它似乎仍然最后一次成功写入套接字。

关于java - 骡子 ESB 3.4.0 CE : Redeliver message through TCP Endpoint upon Socket Exception,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23398637/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com