gpt4 book ai didi

apache-kafka - 卡夫卡消费者 : Want to read same message again if not committed previous messages offset and auto commit is disabled

转载 作者:行者123 更新时间:2023-12-04 05:30:24 25 4
gpt4 key购买 nike

我已经关闭了自动提交,并且在阅读后也没有从消费者那里提交偏移量。

检查的消费者滞后也保持不变,它确保偏移量不会被提交。但问题是,它再次消耗下一条消息而不是相同的消息。

我怎样才能一遍又一遍地阅读相同的消息。只有在提交了先前的偏移量时,我才应该能够读取下一条消息。请帮我在这里做这件事。

最佳答案

如果您知道您的 kafka 消费者当前正在访问哪个分区,您可以使用 kafkaconsumer.seek(partition, offset) 方法告诉您的消费者从特定偏移量读取消息。
例子:

//to get the partition from consumer record
val partition: Int = consumerRecord.partition()

//to get offset of current record
val recordOffset = consumerRecord.offset()

if(data processing fail condition)
consumer.seek(new TopicPartition(topic, partition), recordOffset )

//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100)

关于apache-kafka - 卡夫卡消费者 : Want to read same message again if not committed previous messages offset and auto commit is disabled,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52774519/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com