gpt4 book ai didi

java - Spring Cloud Stream @ServiceActivator 在异常时不向 errorChannel 发送消息

转载 作者:行者123 更新时间:2023-12-01 01:47:09 25 4
gpt4 key购买 nike

我正在使用 spring-cloud-starter-stream-kafka 使用 spring cloud stream。我在 application.properties 中将我的 channel 绑定(bind)到 kafka 主题:

spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12

我无法让我的程序向错误 channel 生成异常消息。令人惊讶的是,它似乎甚至没有尝试生成它,即使我在不​​同的线程中(我有一个 @MessagingGateway 将消息转储到 gatewayOutput,然后流程的其余部分异步发生)。这是我的 ServiceActivator 的定义:

@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
FulfillingService {

@Override
@Audit(value = "annotatedEvent")
@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
public void fulfill(TrivialRedemption redemption) throws Exception {

logger.error("FULFILLED!!!!!!");

throw new Exception("test exception");

}
}

这是生成的日志(我截断了完整的异常)。没有...

  • 提示errorChannel没有任何订阅者
  • Kafka 生产者线程日志记录
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler@2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {}2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {}2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {}2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {}2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {}......2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}

EDIT: Here is the content of my channels class:

public interface Channels {

public static final String GATEWAY_OUTPUT = "gatewayOutput";

public static final String ENRICHING_INPUT = "enrichingInput";
public static final String ENRICHING_OUTPUT = "enrichingOutput";

public static final String REDEEMING_INPUT = "redeemingInput";
public static final String REDEEMING_OUTPUT = "redeemingOutput";

public static final String FULFILLING_INPUT = "fulfillingInput";
public static final String FULFILLING_OUTPUT = "fulfillingOutput";

@Output(GATEWAY_OUTPUT)
MessageChannel gatewayOutput();

@Input(ENRICHING_INPUT)
MessageChannel enrichingInput();

@Output(ENRICHING_OUTPUT)
MessageChannel enrichingOutput();

@Input(REDEEMING_INPUT)
MessageChannel redeemingInput();

@Output(REDEEMING_OUTPUT)
MessageChannel redeemingOutput();

@Input(FULFILLING_INPUT)
MessageChannel fulfillingInput();

@Output(FULFILLING_OUTPUT)
MessageChannel fulfillingOutput();

最佳答案

您没有显示您的Channels 类,但是 Binder 不知道您的“错误” channel 是“特殊”的。

Binder 可以配置重试并将异常路由到死信主题;见this PR在 1.0.0.RELEASE 中。

或者,您可以在服务激活器之前添加一个“中间流”网关 - 将其视为 Java 中的“try/catch” block :

@MessageEndpoint
public static class GatewayInvoker {

@Autowired
private ErrorHandlingGateway gw;

@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT)
public void send(Message<?> message) {
this.gw.send(message);
}

}

@Bean
public GatewayInvoker gate() {
return new GatewayInvoker();
}

@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS)
public interface ErrorHandlingGateway {

void send(Message<?> message);

}

将您的服务激活器的输入 channel 更改为toService

您必须将 @IntegrationComponentScan 添加到您的配置类中,以便框架可以检测到 @MessagingGateway 接口(interface)并为其构建代理。

编辑

刚刚向我建议的另一种选择是在您的服务激活器的建议链中添加一个 ExpressionEvaluatingAdvice

关于java - Spring Cloud Stream @ServiceActivator 在异常时不向 errorChannel 发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37215072/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com