作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我使用 pykafka 从 kafka 主题中获取消息,然后进行一些处理并更新到 mongodb。由于pymongodb每次只能更新一项,所以我启动了100个进程。但是在启动时,某些进程出现错误“PartitionOwnedError and ConsumerStoppedException”。我不知道为什么。谢谢你。
kafka_cfg = conf['kafka']
kafka_client = KafkaClient(kafka_cfg['broker_list'])
topic = kafka_client.topics[topic_name]
balanced_consumer = topic.get_balanced_consumer(
consumer_group=group,
auto_commit_enable=kafka_cfg['auto_commit_enable'],
zookeeper_connect=kafka_cfg['zookeeper_list'],
zookeeper_connection_timeout_ms = kafka_cfg['zookeeper_conn_timeout_ms'],
consumer_timeout_ms = kafka_cfg['consumer_timeout_ms'],
)
while(1):
for msg in balanced_consumer:
if msg is not None:
try:
value = eval(msg.value)
id = long(value.pop("id"))
value["when_update"] = datetime.datetime.now()
query = {"_id": id}}
result = collection.update_one(query, {"$set": value}, True)
except Exception, e:
log.error("Fail to update: %s, msg: %s", e, msg.value)
>
Traceback (most recent call last):
File "dump_daily_summary.py", line 182, in <module>
dump_daily_summary.run()
File "dump_daily_summary.py", line 133, in run
for msg in self.balanced_consumer:
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 734, in consume
raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException
>
Traceback (most recent call last):
File "dump_daily_summary.py", line 182, in <module>
dump_daily_summary.run()
File "dump_daily_summary.py", line 133, in run
for msg in self.balanced_consumer:
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 726, in consume
self._raise_worker_exceptions()
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 271, in _raise_worker_exceptions
raise ex
pykafka.exceptions.PartitionOwnedError
最佳答案
PartitionOwnedError:检查是否有一些后台进程在同一个consumer_group中消费,可能没有足够的可用分区来启动另一个消费者。
ConsumerStoppedException:您可以尝试升级您的 pykafka 版本 (https://github.com/Parsely/pykafka/issues/574)
关于pykafka - 为什么在启动一些消费者时出现错误 PartitionOwnedError 和 ConsumerStoppedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39615502/
我使用 pykafka 从 kafka 主题中获取消息,然后进行一些处理并更新到 mongodb。由于pymongodb每次只能更新一项,所以我启动了100个进程。但是在启动时,某些进程出现错误“Pa
我是一名优秀的程序员,十分优秀!