gpt4 book ai didi

python - 卡夫卡消费者 : How to start consuming from the last message in Python

转载 作者:太空狗 更新时间:2023-10-29 22:30:23 26 4
gpt4 key购买 nike

我正在使用 Kafka 0.8.1 和 Kafka python-0.9.0。在我的设置中,我有 2 个 kafka 代理设置。当我运行我的 kafka 消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都很好!

我的问题是,当我重新启动消费者时,它会从头开始消费消息。我所期望的是,重启后,消费者会从它死前停止的地方开始消费消息。

我确实尝试跟踪 Redis 中的消息偏移量,然后在从队列中读取消息之前调用 consumer.seek 以确保我只收到我以前从未见过的消息。虽然这行得通,但在部署此解决方案之前,我想与大家核实一下……也许我对 Kafka 或 python-Kafka 客户端有一些误解。似乎消费者能够从中断的地方重新开始阅读是非常基本的功能。

谢谢!

最佳答案

注意 kafka-python 库。它有一些小问题。

如果速度对您的消费者来说不是真正的问题,您可以在每条消息中设置自动提交。它应该有效。

SimpleConsumer 提供了一个 seek 方法 ( https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185 ),让您可以在任意时间点开始消费消息。

最常见的调用是:

  • consumer.seek(0, 0) 从队列的开头开始读取。
  • consumer.seek(0, 1) 从当前偏移量开始读取。
  • consumer.seek(0, 2) 跳过所有未决消息并开始只读取新消息。

第一个参数是这些位置的偏移量。这样,如果您调用 consumer.seek(5, 0),您将跳过队列中的前 5 条消息。

另外,不要忘记,偏移量是为消费者组存储的。确保您一直使用同一个。

关于python - 卡夫卡消费者 : How to start consuming from the last message in Python,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24661533/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com