gpt4 book ai didi

java - Spring Cloud Stream Kafka错误 channel

转载 作者:行者123 更新时间:2023-12-02 11:32:35 24 4
gpt4 key购买 nike

我正在尝试设置一个绑定(bind),将 Kafka 消息从 Spring Integration errorChannel 转发到自定义 channel (用于集中式错误处理)。

错误消息正在发送到配置的 channel ,但它们以带有 byte[] 负载的 GenericMessage 形式到达,其中包含异常详细信息和失败的消息。

我的配置:

spring:
cloud:
stream:
kafka:
bindings:
accountOut.producer:
sync: true
binder:
autoCreateTopics: false
headers:
- spanId
- spanTraceId
- spanSampled
- spanParentSpanId
- spanName
- spanFlags
- eventType
- Authorization
bindings:
error:
destination: test-error
accountOut:
producer.partitionKeyExpression: payload.key
content-type: application/json
destination: account
kafka:
producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer

我正在使用 @StreamListener(target = "kieran-error") 进行监听消费者配置了 @Input("kieran-error") SubscribableChannel

阅读the docs ,我期望消息以 ErrorMessage 形式到达。有什么办法可以实现这个目标吗?或者将有效负载配置为作为对象到达?

我正在使用的版本:

  • Spring Boot 1.5.8
  • Spring Cloud Edgware
  • Kafka 11 客户端
  • Spring Integration 核心 4.3.12

--- 问题更新 ---

我现在意识到我可以配置 Spring Integration 通过监听 errorChannel 转发到 Kafka 主题,例如

@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
return handler;
}

但是是否可以在属性 yaml 而不是代码中配置此流程?这就是所有其他 Kafka 配置所在的位置,因此在代码中配置 kafka 模板并不理想。

另一种选择是显式监听 ErrorMessage 并发送到代码中的 kafka 输出 channel :

@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
outputChannel.kieranError().send(...)
}

最佳答案

您到底在哪里使用这样的消息?

您描述的消息听起来像是当消费者的 enableDlq 为 true 时发送到 DLQ 主题的消息;您没有显示消费者配置,因此我很难猜测。

发送到特定于目标的错误 channel (并桥接到全局 errorChannel)的 ErrorMessage 可以使用

@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
...
}

error:
destination:

是遗留的,旨在让用户代码将消息发送到将转到该主题的errorChannel

关于java - Spring Cloud Stream Kafka错误 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49193556/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com