gpt4 book ai didi

spring-boot - 带有 spring-kafka 的 Kafka 死信队列(DLQ)

转载 作者:行者123 更新时间:2023-12-04 20:29:43 28 4
gpt4 key购买 nike

什么是最好的实现方式死信队列 (DLQ) Spring Boot 2.0 应用程序中的概念使用 spring-kafka 2.1.x 来处理所有消息 @KafkaListener 一些 bean 发送到某个预定义的 Kafka DLQ 主题并且不会丢失单个消息的方法?
所以消耗的 Kafka 记录是:

  • 成功处理,
  • 处理失败,发送到DLQ主题,
  • 处理失败,没有发送到DLQ主题(由于意外问题)所以会再次被监听器消费。

  • 我尝试使用 的自定义实现创建监听器容器ErrorHandler 发送记录无法使用 KafkaTemplate 处理到 DLQ 主题。使用禁用的自动提交和 记录 确认模式。
    spring.kafka.enable-auto-ack=false
    spring.kafka.listener.ack-mode=RECORD

    @Configuration
    public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
    ...
    factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
    return factory;
    }
    }

    @Component
    public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
    log.error("Error, sending to DLQ...");
    kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
    }
    似乎这个实现不保证项目 #3 .如果在 DlqErrorHandler 记录中会抛出异常,则监听器将不会再次消费。
    使用事务监听器容器会有帮助吗?
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
    有没有什么方便的方法可以使用 Spring Kafka 实现 DLQ 概念?
    更新 2018/03/28
    感谢 Gary Russell 的回答,我能够通过如下实现 DlqErrorHandler 来实现所需的行为
    @Configuration
    public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
    ...
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
    return factory;
    }
    }

    @Component
    public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
    Consumerrecord<?, ? record = records.get(0);
    try {
    kafkaTemplate.send("dlqTopic", record.key, record.value());
    consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
    // Other records may be from other partitions, so seek to current offset for other partitions too
    // ...
    } catch (Exception e) {
    consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
    // Other records may be from other partitions, so seek to current offset for other partitions too
    // ...
    throw new KafkaException("Seek to current after exception", thrownException);
    }
    }
    }
    这样,如果消费者轮询返回 3 条记录 (1, 2, 3) 并且无法处理第二条记录:
  • 1 将被处理
  • 2 将无法处理并发送到 DLQ
  • 3 感谢消费者寻求record.offset() + 1,它将被传递给听众

  • 如果发送到 DLQ 失败,消费者会寻找 record.offset() 并且记录将重新传递给监听器(并且发送到 DLQ 可能会被停用)。
    更新 2021/04/30
    自 Spring Kafka 2.7.0 non-blocking retries and dead letter topics原生支持。
    看例子: https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt
    重试通常应该是非阻塞的(在单独的主题中完成)并延迟:
  • 不干扰实时流量;
  • 不增加调用数量,本质上是发送垃圾请求;
  • 用于可观察性(获取重试次数和其他元数据)。
    使用 Kafka 实现非阻塞重试和 DLT 功能通常需要设置额外的主题并创建和配置相应的监听器。
    Kafka non-blocking retries and DLT
  • 最佳答案

    SeekToCurrentErrorHandler .

    当发生异常时,它会寻找消费者,以便在下一次轮询时重新传递所有未处理的记录。

    您可以使用相同的技术(例如子类)写入 DLQ 并在 DLQ 写入失败时查找当前偏移量(和其他未处理的),如果 DLQ 写入成功则仅查找剩余记录。

    关于spring-boot - 带有 spring-kafka 的 Kafka 死信队列(DLQ),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49507709/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com