gpt4 book ai didi

go - 重消费Kafka消息的可能原因

转载 作者:数据小太阳 更新时间:2023-10-29 03:19:01 29 4
gpt4 key购买 nike

昨天从日志中发现,kafka group coordinator发起group rebalance后,kafka重新消费了一些消息。这些消息已在两天前使用(从日志中确认)。

日志中报告了另外两个重新平衡,但它们不再重新使用消息。那么为什么第一次reblancing会导致重新消费消息呢?有什么问题?

我使用的是 golang kafka 客户端。这是代码

config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest

而且我们在声明消息之前处理消息,因此我们似乎正在为 kafka 使用“至少发送一次”策略。我们在一台机器上有三个代理,而在另一台机器上只有一个消费者线程(go routine)。

对这种现象有什么解释吗?我认为这些消息一定已经提交,因为它们在两天前被消费了,否则为什么 kafka 会保留偏移量超过两天而不提交?

消费代码示例:

func (consumer *Consumer) ConsumeClaim(session 
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for message := range claim.Messages() {
realHanlder(message) // consumed data here
session.MarkMessage(message, "") // mark offset
}

return nil
}

添加:

  1. 重新平衡发生在应用重启后。还有另外两次重启没有导致重新启动

  2. kafka的配置

    log.retention.check.interval.ms=300000
    log.retention.hours=168
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable = true
    auto.create.topics.enable=false

最佳答案

通过阅读golang saram client和kafka server的源码,最终找到原因如下

  1. Consumer group offset保留时间为24hours,这是kafka的默认设置,而log保留时间为 7天由我们明确设定。

  2. 我的服务器应用运行在测试环境中,几乎没有人可以访问,这意味着kafka生产者生产的消息可能很少,然后消费者组可以消费的消息很少,因此消费者可能不会提交任何消息抵消了很长时间。

  3. 当消费偏移量超过 24 小时未更新时,由于偏移量配置,kafka 代理/协调器将从分区中删除消费偏移量。下次 saram 从 kafka broker 查询偏移量在哪里时,客户端当然什么也得不到。注意我们使用sarama.OffsetOldest 作为初始值,然后 sarama 客户端将从 kafka broker 保存的消息开始消费消息,这会导致消息重新消费,这很可能会发生,因为日志保留是7天

关于go - 重消费Kafka消息的可能原因,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56852305/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com