gpt4 book ai didi

apache-kafka - 为什么 kafka 消费者数百次消费相同的消息?

转载 作者:行者123 更新时间:2023-12-04 04:18:49 27 4
gpt4 key购买 nike

我从日志中看到 665 次消耗了完全相同的消息。为什么会发生这种情况?

我也在日志中看到了这一点

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies
that the poll loop is spending too much time message processing. You can address this either by increasing the session
timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

消费者属性
group.id=someGroupId
bootstrap.servers=kafka:9092
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
session.timeout.ms=30000
max.poll.records=20

PS:是否可以从队列中的 1000 条消息中只消耗特定数量的消息,例如 10 条、50 条或 100 条消息?
我正在查看 'fetch.max.bytes' 配置,但它似乎是针对消息大小而不是消息数量。

谢谢

最佳答案

答案在于对以下概念的理解:

  • session.timeout.ms
  • 心跳
  • max.poll.interval.ms

  • 在您的情况下,您的消费者通过 poll() 收到一条消息,但无法在 max.poll.interval.ms 时间内完成处理。因此,假设被 Broker 挂起,并且由于该消费者失去了所有分区的所有权而发生了分区的重新平衡。它被标记为死亡,不再是消费者组的一部分。

    然后,当您的消费者完成处理并再次调用 poll() 时,会发生两件事:
  • 提交失败,因为消费者不再拥有分区。
  • Broker 识别出消费者再次启动,因此触发重新平衡,消费者再次加入消费者组,开始拥有分区并从 Broker 请求消息。由于较早的消息未标记为已提交(请参阅上面的 #1,提交失败)并且正在等待处理,因此代理再次将相同的消息传递给消费者。

  • 消费者再次花费大量时间进行处理,并且由于无法在小于 max.poll.interval.ms 的时间内完成处理,因此 1. 和 2. 继续循环重复。

    要解决此问题,您可以根据消费者需要多长时间进行处理,将 max.poll.interval.ms 增加到足够大的值。这样你的消费者就不会被标记为死亡,也不会收到重复的消息。
    但是,真正的解决方法是检查您的处理逻辑并尝试减少处理时间。

    关于apache-kafka - 为什么 kafka 消费者数百次消费相同的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51736582/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com