gpt4 book ai didi

python - Kafka producer.send 从不发送消息

转载 作者:行者123 更新时间:2023-12-03 22:55:48 25 4
gpt4 key购买 nike

我使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我正在尝试测试一个简单的生产者:

class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))

当这个过程被实例化时,消费者永远不会收到消息

如果我刷新生产者并更改 linger_ms 参数(使其同步),则消息将由消费者发送和读取:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
producer.flush()

在以前的 Kafka 版本中,有参数 queue.buffering.max.ms 来指定生产者将等待多长时间才能发送队列中的消息,但在最新版本(kafka-python 1.3.3)中不存在。我如何在较新的 Kafka 版本中指定它以保持我的通信异步?

谢谢!

最佳答案

正如您所观察到的,消息排队等待异步发送,并且不能保证它会立即发送。所以如果要强制将消息发送给broker,需要显式调用producer.flush()它将阻塞直到消息被发送(尽管 flush() 不保证确认)。

注意:因为 flush()是阻塞调用,通常只推荐用于低吞吐量系统或应用程序关闭时。同步发送与异步发送的吞吐量命中通常对于大容量系统是不可行的。我的经验是,生产者通常会很快发送而不需要调用flush(),除了测试套件/开发需要立即发生的地方。

我相当确定参数 queue.buffering.max.ms已替换为 linger_ms : https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer

因此,您已经在工作示例中使用了该参数。

关于python - Kafka producer.send 从不发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45092857/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com