gpt4 book ai didi

spring-cloud-stream kafka 错误处理

转载 作者:行者123 更新时间:2023-12-03 19:50:41 30 4
gpt4 key购买 nike

我浏览了 spring-cloud-stream 1.0.0.RELEASE 的文档,似乎找不到任何有关错误处理的文档。

根据对 kafka 0.9 的观察,如果我的消费者抛出 RuntimeException,我会看到 3 次重试。 3 次重试后,我在日志中看到:

2016-05-17 09:35:59.216 ERROR 8983 --- [  kafka-binder-] o.s.i.k.listener.LoggingErrorHandler     : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]]

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message

此时,消费者偏移量滞后 1,如果我重新启动消费者,则再次重试消息 3 次。但是,如果我随后将另一条消息发送到同一分区以使消费者不引发异常,则消费者偏移量将被更新,并且我们为其引发异常的原始消息在重新启动后将不再重试。

这是否记录在我没有找到的地方?错误处理是特定于活页夹的,还是 s-c-s 将其抽象为在活页夹之间保持一致?我怀疑这是消费者偏移量如何使用 kafka binder 更新的意外结果。我看到添加了一个 enableDlq kafka 消费者属性,我即将对其进行测试,但我不确定我们如何处理 kafka 中的死信。我对 rabbitmq 中的死信队列很熟悉,但是使用 rabbitmq,我们可以使用 rabbitmq shovel 插件来重新发布和重试 dlq 消息,以涵盖由于临时服务中断导致失败的情况。除了自己编写类似的实用程序外,我不知道 kafka 有任何类似的功能。

更新:启用 enableDlq kafka 消费者属性的测试显示了与错误处理相同的消费者偏移问题。当消费者抛出 RuntimeException 时,我看到 3 次重试,之后没有记录错误消息,我看到一条消息发布到 error.<destination>.<group>如文档所述,但消费者偏移量未更新并滞后 1。如果我重新启动消费者,它会尝试再次处理来自原始主题分区的相同失败消息,重试 3 次并将相同消息放在 error.<destination>.<group> 上再次主题(重复的 dlq 消息)。如果我将另一条消息发布到消费者未引发 RuntimeException 的同一主题分区,则偏移量将更新,并且在重新启动时不再重试原始失败消息。

我认为无论 enableDlq 是否为真,当消费者抛出错误时,消费者都应该更新 kafka 中的消费者偏移量。这至少可以使所有重试失败的消息被丢弃(当 enableDlq 为 false 时)或发布到 dlq 并且永远不会重试(当 enableDlq 为 true 时)。

最佳答案

对我来说看起来像一个错误 - 监听器容器有一个属性 autoCommitOnError (false 默认情况下)不被活页夹暴露(或设置)。如果 bool 值为真,则在调用错误处理程序(发布错误)后,提交偏移量。

请在 github 中将其报告为问题.

关于spring-cloud-stream kafka 错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37282045/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com