gpt4 book ai didi

Python kafka消费者不会消费来自生产者的消息

转载 作者:行者123 更新时间:2023-12-01 08:22:03 27 4
gpt4 key购买 nike

所以我对卡夫卡还很陌生。我尝试运行一个简单的 Kafka 消费者和生产者。当我运行我的消费者时,它会在 for 循环之前打印 hello 。但是 for 循环中没有打印任何内容,这让我相信它一开始就不会进入 for 循环,并且消费者不会消费来自生产者的消息。我在 Linux 系统上运行这个。

任何人都可以就生产者或消费者可能出现的问题提供建议吗?我已经展示了我的生产者和消费者代码,它们都只有几行代码。

这是我的制作人:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:2181',api_version=(1,0,1))
producer.send('MyFirstTopic1', 'Hello, World!')

这是我的消费者:

from kafka import KafkaConsumer,KafkaProducer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
bootstrap_servers=['localhost:2181'],api_version=(1,0,1),
group_id=None,
enable_auto_commit=False,
auto_offset_reset='smallest'
)
consumer.subscribe('MyFirstTopic1',0)
print("hello")
for message in consumer:
print(message)

因此,当运行我的生产者时,它最终会出现错误。任何人都知道这意味着什么,以及这是否可能是错误的地方。

File "producer.py", line 3, in <module>
producer.send('MyFirstTopic1', 'Hello, World!')
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 543, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata
"Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

最佳答案

看来您在客户端配置中使用了错误的主机。 localhost:2181 通常是 Zookeeper 服务器。

为了让您的客户端正常工作,您需要将 bootstrap_servers 设置为 Kafka 代理主机名和端口。默认情况下,这是localhost:9092

参见https://kafka-python.readthedocs.io/en/latest/apidoc/KafkaProducer.html

关于Python kafka消费者不会消费来自生产者的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54578364/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com