gpt4 book ai didi

pykafka - 为什么在启动一些消费者时出现错误 PartitionOwnedError 和 ConsumerStoppedException

转载 作者:行者123 更新时间:2023-12-02 03:11:37 24 4
gpt4 key购买 nike

我使用 pykafka 从 kafka 主题中获取消息,然后进行一些处理并更新到 mongodb。由于pymongodb每次只能更新一项,所以我启动了100个进程。但是在启动时,某些进程出现错误“PartitionOwnedError and ConsumerStoppedException”。我不知道为什么。谢谢你。

kafka_cfg = conf['kafka']
kafka_client = KafkaClient(kafka_cfg['broker_list'])
topic = kafka_client.topics[topic_name]

balanced_consumer = topic.get_balanced_consumer(
consumer_group=group,
auto_commit_enable=kafka_cfg['auto_commit_enable'],
zookeeper_connect=kafka_cfg['zookeeper_list'],
zookeeper_connection_timeout_ms = kafka_cfg['zookeeper_conn_timeout_ms'],
consumer_timeout_ms = kafka_cfg['consumer_timeout_ms'],
)
while(1):
for msg in balanced_consumer:
if msg is not None:
try:
value = eval(msg.value)
id = long(value.pop("id"))
value["when_update"] = datetime.datetime.now()
query = {"_id": id}}

result = collection.update_one(query, {"$set": value}, True)
except Exception, e:
log.error("Fail to update: %s, msg: %s", e, msg.value)

>

Traceback (most recent call last):
File "dump_daily_summary.py", line 182, in <module>
dump_daily_summary.run()
File "dump_daily_summary.py", line 133, in run
for msg in self.balanced_consumer:
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 734, in consume
raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException

>

Traceback (most recent call last):
File "dump_daily_summary.py", line 182, in <module>
dump_daily_summary.run()
File "dump_daily_summary.py", line 133, in run
for msg in self.balanced_consumer:
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 726, in consume
self._raise_worker_exceptions()
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 271, in _raise_worker_exceptions
raise ex
pykafka.exceptions.PartitionOwnedError

最佳答案

PartitionOwnedError:检查是否有一些后台进程在同一个consumer_group中消费,可能没有足够的可用分区来启动另一个消费者。

ConsumerStoppedException:您可以尝试升级您的 pykafka 版本 (https://github.com/Parsely/pykafka/issues/574)

关于pykafka - 为什么在启动一些消费者时出现错误 PartitionOwnedError 和 ConsumerStoppedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39615502/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com