gpt4 book ai didi

go - 卡夫卡消费者 : How to programatically consume from specific offset in Go Sarama

转载 作者:行者123 更新时间:2023-12-01 21:10:35 28 4
gpt4 key购买 nike

最近,我开始研究使用kafka。我正在处理的项目使用 sarama

我使用 ConsumerGroup 阅读消息。

如果 foo 返回 false,我需要在一段时间后再次阅读消息。如何做到这一点?

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for message := range claim.Messages() {

if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}

}

return nil
}

最佳答案

您可以通过在消费者组的 Setup() 回调中包含以下内容,将消费者组的偏移量重置为旧偏移量:

func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
sess.ResetOffset(topic, partition, offset, "")

return nil
}

您也可以通过控制台实现相同的目的:

kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--topic myTopicName \
--reset-offsets \
--to-offfset 100 \
--execute

关于go - 卡夫卡消费者 : How to programatically consume from specific offset in Go Sarama,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61593064/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com