gpt4 book ai didi

python - 如何使用 confluent-kafka-python 确定是否存在 kafka 主题

转载 作者:行者123 更新时间:2023-12-03 23:47:36 25 4
gpt4 key购买 nike

我正在使用 confluent-kafka-python 包与 Kafka 服务器交互。我可以成功创建主题并向其推送事件。但是,我的问题在于当我启动多个节点(在 Docker 中运行)时,如果第二个实例也尝试创建主题,我会收到错误消息。在创建新主题之前,我需要先检查主题是否已经存在。

from confluent_kafka.admin import AdminClient, NewTopic
kafka_admin = AdminClient({"bootstrap.servers": server})

# First check here if the topic already exists!
if not topic_exists(topic): # <-- how to accomplish this?
new_kafka_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
results = kafka_admin.create_topics([new_kafka_topic])

谢谢你的帮助!

最佳答案

我遇到了同样的问题,我通过以下方式管理它:

client = AdminClient({"bootstrap.servers": BROKER_URL})
topic_metadata = client.list_topics()
if topic_metadata.topics.get(self.topic_name) is None:
self.create_topic()

关于python - 如何使用 confluent-kafka-python 确定是否存在 kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61600057/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com