gpt4 book ai didi

java - Spring 卡夫卡|如何使 DeserializationException 可重试?

转载 作者:行者123 更新时间:2023-12-02 01:38:18 25 4
gpt4 key购买 nike

我有一个以 Avro 格式发送消息的生产者和一个监听这些消息的消费者。

我还通过在我的 Consumer 中使用 @RetryableTopic 来实现非阻塞重试来处理错误。

当消费者无法反序列化消息时(由于架构更改或任何原因),它不会将该消息放入 -retry 主题中。它直接将其发送到 -dlt 主题。

我也希望重试 DeserializationException。原因是,当重试这些错误时,我可以在我的 Consumer 中部署修复程序,以便重试最终能够成功。

我尝试了 @RetryableTopic 中的 include 选项,但它似乎不适用于 DeserializationException

  @RetryableTopic(
attempts = "${app.consumer.retry.topic.count:5}",
backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
include = {DeserializationException.class} // does not work
)

这是@RetryableTopic中的错误还是有其他方法可以实现此目的?

最佳答案

Spring Kafka 2.8.3 起,有一组 global fatal exceptions正如您所描述的,这将导致记录直接转发到 DLT

处理此类异常的通常模式是,在部署修复程序后,使用某种控制台应用程序从 DLT 检索失败的记录并重新处理它,可能通过发送记录回第一个重试主题,以便主主题中不会出现重复内容。

对于您描述的模式,您可以通过提供 DestinationTopicResolver bean 来管理这组全局的 FATAL 异常,例如:

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
return ddtr;
}

请告诉我这是否适合您。谢谢。

关于java - Spring 卡夫卡|如何使 DeserializationException 可重试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71990209/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com