gpt4 book ai didi

python - Kafka Python 消费者 API 不返回任何内容

转载 作者:太空宇宙 更新时间:2023-11-03 14:04:30 26 4
gpt4 key购买 nike

我正在使用Kafka-Python从卡夫卡经纪人的主题中读取,但我似乎无法让消费者迭代器返回任何内容

consumer = KafkaConsumer("topic",bootstrap_servers=bootstrap_server + ":" + str(port), group_id="mygroup")

for record in consumer:
print(record)

看起来它只是挂着。我已经验证该主题存在并且在代理上有数据并且正在生成新数据。当我更改对 KafkaConsumer 构造函数的调用并添加 auto_offset_reset="earliest"时,一切都会按预期工作,并且消费者迭代器返回记录。此参数的默认值是“latest”,但使用该值我似乎无法消耗数据。

为什么会出现这种情况?

最佳答案

在实例化 KafkaConsumer 时,您还需要包含 auto_offset_reset='smallest' ,这相当于命令行的 --from-beginning工具kafka-console-consumer.sh

consumer = KafkaConsumer("topic",bootstrap_servers=bootstrap_server + ":" + str(port), group_id="mygroup", auto_offset_reset='smallest')

您可能看到没有数据被消耗的原因可能是因为当您的消费者启动并运行时,生产者端不会生成任何数据。因此,您需要表明您想要消费主题中的所有数据(即使此时没有插入数据)。

根据official documentation :

The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.

关于python - Kafka Python 消费者 API 不返回任何内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48998955/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com