gpt4 book ai didi

python - KafkaTimeoutError : Failed to update metadata after 60. 0 秒

转载 作者:行者123 更新时间:2023-12-04 15:26:14 30 4
gpt4 key购买 nike

我有一个高吞吐量 kafka 生产者的用例,我想每秒推送数千条 json 消息。

我有一个 3 节点的 kafka 集群,我使用的是最新的 kafka-python 库,并有以下方法来生成消息

def publish_to_kafka(topic):
data = get_data(topic)
producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
try:
for obj in data:
producer.send(topic, value=obj)
except Exception as e:
logger.error(e)
finally:
producer.close()

我的主题有 3 个分区。

方法有时会正常工作,但会失败并显示错误“KafkaTimeoutError:60.0 秒后无法更新元数据”。

我需要更改哪些设置才能使其顺利运行?

最佳答案

  • 如果某个主题不存在,而您正在尝试生成该主题,并且自动主题创 build 置为 false,则可能会发生这种情况。

    可能的分辨率:在代理配置 (server.properties) 中 auto.create.topics.enable=true (注意,这是 Confluent Kafka 中的默认设置)
  • 另一种情况可能是网络拥塞或速度,如果使用 Kafka 代理更新元数据的时间超过 60 秒。

    可能的分辨率:生产者配置:max.block.ms = 1200000 (120 秒,例如)
  • 检查您的代理是否由于某种原因(例如,负载过多)而关闭,以及为什么他们无法响应元数据请求。通常,您可以在 server.log 文件中看到它们。
  • 关于python - KafkaTimeoutError : Failed to update metadata after 60. 0 秒,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62198606/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com