gpt4 book ai didi

java - 使用出站适配器配置断路器以处理连接超时问题

转载 作者:行者123 更新时间:2023-11-30 07:43:09 34 4
gpt4 key购买 nike

<int:service-activator input-channel="toKafka"  ref="conditionalProducerService" method="producerCircuitBreaker">

<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice1" />
</int:request-handler-advice-chain>
</int:service-activator>

<int:channel id="failedChannel2" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
<int:poller fixed-delay="1000" error-channel="failedChannel2" />
</int-kafka:outbound-channel-adapter>


<int:chain input-channel="failedChannel2">
<int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>

<bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>

public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");}


通过以上配置,我们正在尝试:

1.期望将失败的消息传播到错误通道=“ failedChannel2”,这不会发生。因为我无法在控制台中看到转换后的输出。

2.CircuitBreaker正在为ServiceActivator工作(上面针对应用程序相关的异常),但是如何为出站适配器的失败案例配置CB。例如:当连接超时或服务器突然关闭/网络连接问题/某些环境问题,然后才将消息从SI通道发送到外部(kafka)服务器。在这种情况下,我们可以为CB配置出站适配器吗?

根据有关断路器建议的SI文档,请参见下文。

“通常,此建议可能会用于外部服务,否则可能会花费一些时间(例如
作为尝试建立网络连接的超时)”。

请提出实现方法的建议。非常感谢。

更新的配置:
        
    
    

        <int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />

<int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
<constructor-arg ref="producerContext"/>
</bean>
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>




<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />

<int:transformer input-channel="errorChannel"
order="1" ref="transformerService1" method="transformFailed">

</int:transformer>

public void transformFailed(Message<?> message) {
APPLOGGER.log("transformer message test" + message);


public class ProducerMessageHandler extends KafkaProducerMessageHandler{

public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
super(kafkaProducerContext);
// TODO Auto-generated constructor stub
}

@Override
public void handleMessageInternal(final Message<?> message) throws Exception {

//super.handleMessageInternal(message);
throw new RuntimeException("test foo");
}


日志:

01-05 @ 23:44:18,598调试org.springframework.integration.config.ServiceActivatorFactoryBean $ 1-org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6收到消息:GenericMessage [payload = hello,headers = {timestamp = 1452017658598,id = e0591162-3b93-9bb6-0699-89b15b20e904}]
  调试:-com.XXX.ProducerMessageHandler#0收到了消息:GenericMessage [payload = hello,标头= {timestamp = 1452017658598,id = e0591162-3b93-9bb6-0699-89b15b20e904}]
异常:org.springframework.messaging.MessageHandlingException:消息处理程序[com.XXX.ProducerMessageHandler#0]中发生错误;嵌套的异常是java.lang.RuntimeException:测试foo
01-05 @ 23:44:18,606调试org.springframework.integration.channel.PublishSubscribeChannel-在“ toKafka”频道上预先发送,消息:GenericMessage [payload = hello,标头= {timestamp = 1452017658605,id = 61597941-b2f8-314d- 141d-8f2c058dda4d}]
01-05 @ 23:44:18,606调试org.springframework.integration.config.ServiceActivatorFactoryBean $ 1-org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6收到消息:GenericMessage [payload = hello,headers = {timestamp = 1452017658605,id = 61597941-b2f8-314d-141d-8f2c058dda4d}]
  调试:-com.XXX.ProducerMessageHandler#0接收到消息:GenericMessage [有效载荷=你好,标头= {timestamp = 1452017658605,id = 61597941-b2f8-314d-141d-8f2c058dda4d}]
异常:org.springframework.messaging.MessageHandlingException:消息处理程序[com.XXX.ProducerMessageHandler#0]中发生错误;嵌套的异常是java.lang.RuntimeException:测试foo
01-05 @ 23:44:18,606调试org.springframework.integration.channel.PublishSubscribeChannel-在频道“ toKafka”上预先发送,消息:GenericMessage [payload = hello,标头= {timestamp = 1452017658606,id = 119afbf1-6104-feb1- eb44-f646aa932277}]
01-05 @ 23:44:18,606调试org.springframework.integration.config.ServiceActivatorFactoryBean $ 1-org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6收到消息:GenericMessage [payload = hello,headers = {timestamp = 1452017658606,id = 119afbf1-6104-feb1-eb44-f646aa932277}]
得到异常:org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误[org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6];嵌套的异常是org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice $ CircuitBreakerOpenException:断路器为org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6打开
01-05 @ 23:44:18,606调试org.springframework.integration.channel.PublishSubscribeChannel-在频道“ toKafka”上预先发送,消息:GenericMessage [payload = hello,标头= {timestamp = 1452017658606,id = 8dafe2e0-8efe-c827- e745-1387e6045e7d}]
01-05 @ 23:44:18,606调试org.springframework.integration.config.ServiceActivatorFactoryBean $ 1-org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6收到消息:GenericMessage [payload = hello,headers = {timestamp = 1452017658606,id = 8dafe2e0-8efe-c827-e745-1387e6045e7d}]
得到异常:org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误[org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6];嵌套的异常是org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice $ CircuitBreakerOpenException:断路器为org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6打开

最佳答案

该建议仅适用于分配给它的端点,而不适用于下游流。不幸的是,kafka模式不允许将其应用于出站通道适配器。我创建了一个JIRA issue for that

一种解决方法是将KafkaProducerMessageHandler配置为<bean/>,并从ref将其配置为<service-activator/>。然后,您可以应用断路器。

另一个解决方法是使用流入网关。

<int:service-activator ... ref="gw">
<int:request-handler-advice-chain ...

</int:service-activator>

<int:gateway id="gw" default-request-channel="toKafka"
default-reply-timeout="0"
error-channel="..." ... />


我不确定为什么您不会在错误通道中看到消息;通常,打开DEBUG日志记录将有助于调试这种情况。

编辑

我刚刚对此进行了测试,并且效果很好...

<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />

<int:service-activator input-channel="toKafka">
<bean class="com.example.Foo" />
<int:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2"/>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>


编辑2

如果不使用网关,则可以使用队列通道和轮询器来处理它。这对我也很好...

<int:channel id="toKafka">
<int:queue />
</int:channel>

<int:service-activator input-channel="toKafka">
<bean class="com.example.Foo" />
<int:poller error-channel="errorChannel" fixed-delay="1000" />
<int:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2"/>
<property name="halfOpenAfter" value="12000"/>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>


或者,您可以添加中间流网关。

关于java - 使用出站适配器配置断路器以处理连接超时问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34373348/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com