gpt4 book ai didi

go - 如何在 Golang Kafka 10 中获取分区的消费者组偏移量

转载 作者:IT王子 更新时间:2023-10-29 01:48:34 27 4
gpt4 key购买 nike

现在 Golang Kafka 库 (sarama) 正在提供消费者组功能,而无需任何外部库帮助使用 kafka 10。我如何才能在任何给定时间获取消费者组正在处理的当前消息偏移量?

之前我使用 kazoo-go ( https://github.com/wvanbergen/kazoo-go ) 来获取我的消费者组消息偏移量,因为它存储在 Zookeeper 中。现在我使用 sarama-cluster ( https://github.com/bsm/sarama-cluster ),我不确定使用哪个 API 来获取我的消费者组消息偏移量。

最佳答案

在幕后,consumerGroupSession 结构正在使用 PartitionOffsetManagerget next offset :

    if pom := s.offsets.findPOM(topic, partition); pom != nil {
offset, _ = pom.NextOffset()
}

这是 pom.NextOffset() 的文档.

consumerGroupSession 通过 newConsumerGroupClaim() 方法构造一个 consumerGroupClaim 结构时,它传递偏移量,由 pom.NextOffset( ),作为 offset 参数。您可以稍后通过 claim.InitialOffset() 访问它。开始消费消息后,您可以使用当前处理的消息的 message.Offset

不幸的是,无法从 ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) 方法访问 consumerGroupSession.offsets.findPOM(),因为它作为 ConsumerGroupSession interface 接收 session ,而不是作为 consumerGroupSession struct。所以 offsets 变量是私有(private)的,不可访问。

因此我们无法真正访问 NextOffset() 方法,而该方法正是 OP 想要的。

关于go - 如何在 Golang Kafka 10 中获取分区的消费者组偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40642689/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com