gpt4 book ai didi

apache-kafka - Python confluent kafka 在代理连接断开时引发异常

转载 作者:行者123 更新时间:2023-12-05 06:09:44 26 4
gpt4 key购买 nike

我正在使用 python 3.7 和 confluent-kafka。

以下是我用来轮询 kafka 服务器并读取消息的伪代码。

        while True:
MSG = CONSUMER.poll(0.1)
if MSG is None:
CONSUMER.commit()
print('No msg')
continue
if MSG.error():
print("Consumer error: {}".format(MSG.error()))
CONSUMER.commit()
continue
try:
rawMsg = format(MSG.value().decode('utf-8'))
testmsg = json.loads(rawMsg)
except:
print('invalid json format msg')
CONSUMER.commit()

如果 kafka 服务器由于某种原因宕机或断开连接,我希望抛出异常。

目前,如果发生上述情况,while 循环将继续运行而不会出现任何错误并打印No msg

如何在循环中每次获取异常或检查是否可以连接 kafka 服务器(如果要进行一些检查,它应该是轻量级的)

最佳答案

创建消费者时,您可以在反序列化器中指定 a callback for error .

这是在生产者中使用相同机制的示例:

import confluent_kafka
def error_callback(err):
print("callback hit!")
raise(err)
p = confluent_kafka.Producer({
"bootstrap.servers": "localhost:9092",
"message.max.bytes": 10_000_000,
"error_cb": error_callback,
"debug": "msg",
})
p.produce("test-topic", "a" * int(2e6))
p.flush()

From github issues .这可能会有所帮助,但不能解决问题。

关于apache-kafka - Python confluent kafka 在代理连接断开时引发异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64662926/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com