gpt4 book ai didi

python - KafkaProducer for Python 如果在命令行上完成则发送消息,但如果通过 Python 脚本完成则不会发送消息

转载 作者:行者123 更新时间:2023-12-01 02:24:45 29 4
gpt4 key购买 nike

我的动物园管理员和经纪人正在运行。

当我像这样从命令行发送消息时:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('topic', b'some_message_bytes')

我能够在运行此命令的消费者的另一端接收它:

from kafka import KafkaConsumer
consumer = KafkaConsumer('topic')
for msg in consumer:
print msg

但是,当我在 python 脚本中包含与生产者完全相同的代码并运行它时,消费者不会打印消息,而且,它甚至不会被发送。我通过在命令行上运行它来确认这一点:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
localhost:9092 --topic topic --time -1 --offsets 1 | awk -F ":" '{sum +=
$3} END {print sum}'

这显示当前主题中的消息数。当我从 python 生产者的命令行版本发送更多消息时,它会上升,但运行 python 脚本后主题中的消息数量不会增加。

有人知道这是怎么回事吗?

最佳答案

在 shell 中运行程序与在文件中运行程序是不同的。在 shell 中,当您发送消息时,它会在 Kafka 中以固定的刷新时间发送,并且 Kafka 生产者实例在 shell 中仍然处于事件状态。当在文本文件中运行代码时,即使在刷新消息之前,程序也会结束,并且生产者对象会被垃圾回收。

要在生产者代码中修复此问题,请使用 Producer.flush()

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('topic', b'some_message_bytes')
# This is important
producer.flush()

关于python - KafkaProducer for Python 如果在命令行上完成则发送消息,但如果通过 Python 脚本完成则不会发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47518399/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com