gpt4 book ai didi

Python-Kafka : Keep polling topic infinitely

转载 作者:行者123 更新时间:2023-12-05 01:08:25 24 4
gpt4 key购买 nike

我正在使用 python-kafka 来收听一个 kafka 主题并使用该记录。我想让它无限轮询而没有任何退出。这是我下面的代码:

def test():
consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)

这段代码只是读取数据并直接退出。有没有办法在没有推送消息的情况下继续收听主题?

任何持续监控主题的相关示例对我来说也很棒。

最佳答案

使用 confluent_kafka

import time
from confluent_kafka import Consumer


consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-1',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])

while True:
try:
message = consumer.poll(10.0)

if not message:
time.sleep(120) # Sleep for 2 minutes

if message.error():
print(f"Consumer error: {message.error()}")
continue

print(f"Received message: {message.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")

使用 kafka-python

import time
from kafka import KafkaConsumer

consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='my-consumer-1',
)
consumer.subscribe(['topicName'])

while True:
try:
message = consumer.poll(10.0)

if not message:
time.sleep(120) # Sleep for 2 minutes

if message.error():
print(f"Consumer error: {message.error()}")
continue

print(f"Received message: {message.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")

关于Python-Kafka : Keep polling topic infinitely,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66101466/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com