gpt4 book ai didi

kafka-python - 如何为 kafka-python kafka.KafkaProducer#send() 添加失败回调?

转载 作者:行者123 更新时间:2023-12-03 16:05:10 25 4
gpt4 key购买 nike

如果生成的记录失败,我想设置要触发的回调。最初,我只想记录失败的记录。

Confluent Kafka python 库提供了一种添加回调的机制:

produce(topic[, value][, key][, partition][, on_delivery][, timestamp])
...
on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery

我怎样才能用 kafka-python 实现类似的行为 kafka.KafkaProducer#send()不必使用弃用的 SimpleClient 使用 kafka.SimpleClient#send_produce_request()

最佳答案

虽然它没有记录,但这相对简单。每当您发送消息时,您都会立即收到 Future背部。您可以在 Future 后面附加回调/错误提示:

F = producer.send(topic=topic, value=message, key=key)
F.add_callback(callback, message=message, **kwargs_to_pass_to_callback_method)
F.add_errback(erback, message=message, **kwargs_to_pass_to_errback_method)

相关源代码在这里:
https://github.com/dpkp/kafka-python/blob/1937ce59b4706b44091bb536a9b810ae657c3225/kafka/future.py#L48-L64

我们真的应该记录下来,我提交了 https://github.com/dpkp/kafka-python/issues/1256跟踪它。

关于kafka-python - 如何为 kafka-python kafka.KafkaProducer#send() 添加失败回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46388116/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com