- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在使用@StreamListener 监听 kafka 主题后,在 RuntimeException、全局 erroChannel 或主题特定的 errorChannel (topic.group.errors) 未收到任何错误消息。 @ServiceActivator 没有收到任何东西。
POM Dependencies : Greenwich.RELEASE
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
application.properties
spring.cloud.stream.bindings.input.destination=input
spring.cloud.stream.bindings.input.group=myGroup
spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName=input_deadletter
spring.cloud.stream.kafka.streams.bindings.input.consumer.autoCommitOnError=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.content-Type=application/*+avro
spring.cloud.stream.bindings.output.producer.useNativeEncoding=true
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.schemaRegistryClient.endpoint.schema.avro.schema-locations=classpath:avro/*.avsc
spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=http://localhost:8082
spring.cloud.stream.kafka.streams.binder.application-id=myGroup
spring.cloud.stream.kafka.streams.binder.serdeError=sendtodlq
Registering beans for JMX exposure on startup
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel input.myGroup.errors
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name="input-myGroup.errors"': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name="input.myGroup.errors"] org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel errorChannel
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name=errorChannel': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name=errorChannel]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel nullChannel
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name=nullChannel': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name=nullChannel]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler errorLogger
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=errorLogger,bean=internal': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=errorLogger,bean=internal]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler myTopicListener.error.serviceActivator
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=myTopicListener.error.serviceActivator,bean=endpoint': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=myTopicListener.error.serviceActivator,bean=endpoint]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler myTopicListener.errorGlobal.serviceActivator
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=myTopicListener.errorGlobal.serviceActivator,bean=endpoint': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=myTopicListener.errorGlobal.serviceActivator,bean=endpoint]
org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor - No @KafkaListener annotations found on bean type: class org.springf
@SendTo(MyStreams.OUTPUT)
public KStream<Key, MyEntity> process(KStream<Key, Envelope> myStreamObject) {
return myStreamObject.mapValues(this::transform);
}
@ServiceActivator(inputChannel = "input.myGroup.errors") //channel name 'input.myGroup.errors'
public void error(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}
@ServiceActivator(inputChannel = "errorChannel")
public void errorGlobal(Message<?> message) {
System.out.println("Handling ERROR: GLOBAL " + message);
}
最佳答案
卡夫卡流活页夹不是基于 MessageChannel
s 所以没有Message<?>
发送到错误 channel 。
标准的 kafka 活页夹是 MessageChannelBinder
并支持错误 channel 。
使用 Kafka Streams,您必须实现自己的错误处理。
关于error-handling - 应用程序运行时异常未发送到 errorChannel 或 ServiceActivator 无法监听 errorChannel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56486658/
在使用@StreamListener 监听 kafka 主题后,在 RuntimeException、全局 erroChannel 或主题特定的 errorChannel (topic.group.e
在之前的 Spring 集成应用程序中,我能够订阅应用程序 errorChannel,并且我的失败消息(向外发送)最终会出现在我的(预-定义的)处理程序。 但是,现在我正在尝试在一个更大的应用程序中实
在我的应用程序中,我的网关正在接收 header 中的请求标识符,每当发生错误时,错误 channel 都会收到异常。但看起来,errorChannel 不从网关获取 header 。 是否可以将 h
我在 xml 中定义了一个流程,如下所示:
我目前有一个拆分器,它将消息拆分为类似的消息,其中我只添加一个 header 值,该 header 值在经过编排时会导致不同的响应。 最后,这些消息被发送到聚合器以将响应聚合在一起。 目前,它们以多线
我发现“ 默认 “errorChannel”是一个 PublishSubscribeChannel”,但我无法理解 为什么 .我们可以使用直接 channel 吗?什么是缺点使用直接 channel
我编写了一个程序来从 Solace 队列读取消息。我收到以下错误。 你能帮忙吗? 代码如下: 我的主要配置如下: public class ReadFromQueueConfig { @Aut
我正在使用 spring-cloud-starter-stream-kafka 使用 spring cloud stream。我在 application.properties 中将我的 channe
我正在使用Spring Integration处理一个简单的基于Kafka的项目,我们要求当代理关闭时,消息将传递到ErrorChannel,我们可以处理它们/保存为“死信”等。 我们得到的是无数的异
为了使用带有 Kafka binder 的 Spring Cloud Stream 3.1.1 管理长时间运行的任务,我们需要使用 Pollable Consumer 在单独的线程中手动管理消费,这样
Jms outbound-channel-adapter 工作得很好,但我在日志中间歇性地看到此错误,但是,MQ 消息仍然得到传递。 2019-06-07 10:16:22 [JMSCCThreadP
我的应用程序中有以下 xml 配置,我想将其转换为 Java DSL。 因此,在此引用中,我明确定义了错误 channel 的名称。主要是出于示例原因。有了这个引用,我期望发生的事情是当下游进程抛出异
我是一名优秀的程序员,十分优秀!