- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 spring-cloud-starter-stream-kafka 使用 spring cloud stream。我在 application.properties
中将我的 channel 绑定(bind)到 kafka 主题:
spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12
我无法让我的程序向错误 channel 生成异常消息。令人惊讶的是,它似乎甚至没有尝试生成它,即使我在不同的线程中(我有一个 @MessagingGateway
将消息转储到 gatewayOutput
,然后流程的其余部分异步发生)。这是我的 ServiceActivator
的定义:
@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
FulfillingService {
@Override
@Audit(value = "annotatedEvent")
@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
public void fulfill(TrivialRedemption redemption) throws Exception {
logger.error("FULFILLED!!!!!!");
throw new Exception("test exception");
}
}
这是生成的日志(我截断了完整的异常)。没有...
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler@2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {}2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {}2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {}2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {}2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {}......2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {}2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
EDIT: Here is the content of my channels class:
public interface Channels {
public static final String GATEWAY_OUTPUT = "gatewayOutput";
public static final String ENRICHING_INPUT = "enrichingInput";
public static final String ENRICHING_OUTPUT = "enrichingOutput";
public static final String REDEEMING_INPUT = "redeemingInput";
public static final String REDEEMING_OUTPUT = "redeemingOutput";
public static final String FULFILLING_INPUT = "fulfillingInput";
public static final String FULFILLING_OUTPUT = "fulfillingOutput";
@Output(GATEWAY_OUTPUT)
MessageChannel gatewayOutput();
@Input(ENRICHING_INPUT)
MessageChannel enrichingInput();
@Output(ENRICHING_OUTPUT)
MessageChannel enrichingOutput();
@Input(REDEEMING_INPUT)
MessageChannel redeemingInput();
@Output(REDEEMING_OUTPUT)
MessageChannel redeemingOutput();
@Input(FULFILLING_INPUT)
MessageChannel fulfillingInput();
@Output(FULFILLING_OUTPUT)
MessageChannel fulfillingOutput();
最佳答案
您没有显示您的Channels
类,但是 Binder 不知道您的“错误” channel 是“特殊”的。
Binder 可以配置重试并将异常路由到死信主题;见this PR在 1.0.0.RELEASE 中。
或者,您可以在服务激活器之前添加一个“中间流”网关 - 将其视为 Java 中的“try/catch” block :
@MessageEndpoint
public static class GatewayInvoker {
@Autowired
private ErrorHandlingGateway gw;
@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT)
public void send(Message<?> message) {
this.gw.send(message);
}
}
@Bean
public GatewayInvoker gate() {
return new GatewayInvoker();
}
@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS)
public interface ErrorHandlingGateway {
void send(Message<?> message);
}
将您的服务激活器的输入 channel 更改为toService
。
您必须将 @IntegrationComponentScan
添加到您的配置类中,以便框架可以检测到 @MessagingGateway
接口(interface)并为其构建代理。
编辑
刚刚向我建议的另一种选择是在您的服务激活器的建议链中添加一个 ExpressionEvaluatingAdvice
。
关于java - Spring Cloud Stream @ServiceActivator 在异常时不向 errorChannel 发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37215072/
谁能告诉我这个错误? system.serviceModel/serviceHostingEnvironment/serviceActivations could not be found. 我正在尝
当你添加 @ServiceActivator方法上的注释,该方法可以有不同的返回类型,这似乎对服务有不同的影响: @ServiceActivator(inputChannel = "..", outp
我已经实现了Spring(启动)集成,我有一个InboundChannelAdapter,这个方法通过RestTemplate发送Web请求,并将结果传递给Splitter、Filter,最后到一个S
我有以下类(class): @Configuration public class SpringIntegrationTest { @Bean public SimpleWebServ
我有一个用 @ServiceActivator("CH1") 注释的方法,其中“CH1”定义是: @Bean(name = "CH1") MessageChannel ch1() {
我有一个远程服务,当特定事件发生时,我调用该服务来加载产品的定价数据。加载后,产品定价就会被广播,以供其他消费者在其他地方进行处理。 调用代码并不关心响应 - 它是“即发即忘”,响应应用程序事件并触发
我有一个带有 ServiceActivator 方法的组件类: @Component("payloadService") public class PayloadService { @Tran
我想将 Kafka 与 Spring Boot 和 Avro 模式一起使用。 但我被困在“这三个听众之间有什么区别?”。 创建监听器有 3 种可能性: 使用 - @KafkaListener 注释
在使用@StreamListener 监听 kafka 主题后,在 RuntimeException、全局 erroChannel 或主题特定的 errorChannel (topic.group.e
我使用简单的代码来处理来自 Google PubSub 订阅的消息并将结果返回到主题中。但由于某种原因,添加到输入消息的所有 header 也会出现在输出消息 header 上。 有办法消除这种行为吗
我正在使用 spring-cloud-starter-stream-kafka 使用 spring cloud stream。我在 application.properties 中将我的 channe
我想用 Transactional 注释我的 ServiceActivator,如下所示: @ServiceActivator @Transactional(rollbackFor = Excepti
我有这些版本的遗留代码(Spring 版本:3.2.1.RELEASE 和 Spring Integration 版本 2.2.3.RELEASE)。我想将spring集成的所有xml配置迁移到jav
我想在 Java 8 默认接口(interface)方法上使用 @ServiceActivator 注释。此默认方法将根据业务规则委托(delegate)给此接口(interface)的另一个方法。
我在 TCP 网关上构建的 Spring Integration 应用程序运行良好。它接收到达 TCP 网关的请求消息,并将消息转发给 serviceActivator 以准备响应,并将响应发送给客户
我目前正在 integration-context.xml 文件中指定服务激活器和相关的发布订阅 channel 。像这样的东西(删节版): 现在我需要指定一个自定义执行程序类(用于 MDC
当使用 StructureMap 在 WebAPI 中实现 DI 时,我们使用了 中的 ServiceActivator Configuring Dependency Injection with A
我是一名优秀的程序员,十分优秀!