gpt4 book ai didi

python - 在 Python 中指示 group_id 时,Kafka 未收到消息

转载 作者:行者123 更新时间:2023-11-28 19:01:16 25 4
gpt4 key购买 nike

我正在使用 Kafka (kafka-python) 版本 3.0.0-1.3.0.0.p0.40。我需要为 Python 中的“模拟”主题配置消费者。当我不指明 group_id,即 group_id = None 时,它​​可以正常接收消息。但是,如果我指明 group_id,它不会收到任何消息。

这是我的 Python 代码:

consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])

while not self.stop_event.is_set():
for message in consumer:
print(message)

我试图在消费者属性文件中搜索 group_id 的一些默认值,我找到了一个 cloudera_mirrormaker,但没有任何改变。我将需要使用多个消费者,因此我有一个 group_id 并且他们共享相同的 group_id 很重要。在许多来源中,我发现 group_id 可以是任何字符串...

当我在控制台中运行该主题的消费者时,它可以工作并接收消息

./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID  --partition 0

当我运行 kafka-consumer-groups.sh 以列出所有可用组时,它是空的。

如果有人知道为什么它卡在 Python 中,将不胜感激。非常感谢

这是生产者的代码(为了简单起见,我已经减少了它,因为在这种情况下它不会改变问题)

from kafka import KafkaProducer
class Producer(threading.Thread):
...
def run(self):
producer = KafkaProducer(bootstrap_servers='XXX.XXX.XXX.XXX:9092')
while not self.stop_event.is_set():
string = 'test %s' %time.time()
producer.send('simulation', string.encode())
time.sleep(0.5)
producer.close()

最佳答案

我终于解决了。

那是我的问题:omkafka 配置文件 partitions.number attr 默认为 1

我们根据需要将其更改为 100,它开始工作了!希望对你有帮助

关于python - 在 Python 中指示 group_id 时,Kafka 未收到消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52435151/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com