gpt4 book ai didi

python - Producer.poll(0) 不会产生任何消息,但 Producer.flush() 可以工作

转载 作者:太空宇宙 更新时间:2023-11-03 20:02:16 38 4
gpt4 key购买 nike

当我使用 Producer.flush() 时,它可以工作,但根据 kafka confluent issue 性能较差,但按照建议,我使用 Producer.poll(0) 但不会向主题生成任何消息,是否需要任何配置或我在这里遗漏了某些内容?

self.producer.produce(topic.value, data.encode('utf-8'), callback=self.delivery_report)

self.producer.poll(0) # -> doesn't work
self.producer.flush() # -> works

最佳答案

消息未发送到 kafka,因为没有时间执行此操作。您的应用程序将提前终止。

这会起作用:

self.producer.produce(topic.value, data.encode('utf-8'), callback=self.delivery_report)
timer.sleep(1) -- sleep for one seconds.
self.producer.poll(0)

生产者有两个缓冲区。第一个发送缓冲区,第二个响应缓冲区(来自kafka的响应)。

方法product(...) - 正在向发送缓冲区添加新消息。默认情况下,后台线程会尝试尽快发送消息,但仍然需要时间来执行此操作。

方法poll(0) - 正在检查响应缓冲区并执行回调方法。如果缓冲区为空,则不会发生任何事情。

方法flush() - 正在检查两个缓冲区,直到所有消息都将被处理并且正在执行回调方法。在退出应用程序之前使用此方法。

使用示例。

def send(topic,message,callback_report):
producer.produce(topic,message,callback=callback_report)
producer.pool(0) // execute callback for previous messages,

for msg in big_collection_of_messages:
send('blabla',msg,delivery_report)


producer.flush()
//END OF APPLICATION

注意。这个解释是一个很大的简化。

关于python - Producer.poll(0) 不会产生任何消息,但 Producer.flush() 可以工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59172333/

38 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com