gpt4 book ai didi

python - Kafka 一遍又一遍地重放消息 - 心跳 session 已过期 - 标记协调器已死

转载 作者:太空宇宙 更新时间:2023-11-04 02:41:08 66 4
gpt4 key购买 nike

使用 python kafka api 从主题中读取只有少量消息的消息。 Kafka 不断重复播放队列中的消息。

它从我的主题接收到一条消息(返回每条消息内容),然后抛出.更多日志:

kafka.coordinator - ERROR - Heartbeat session expired - marking coordinator dead
kafka.coordinator - WARNING - Marking the coordinator dead (node 1) for group GROUPID1: Heartbeat session expired.
kafka.coordinator.consumer - WARNING - Auto offset commit failed for group GROUPID1: CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
kafka.cluster - INFO - Group coordinator for GROUPID1 is BrokerMetadata(nodeId=1, host='HOST', port=PORT, rack=None)
kafka.coordinator - INFO - Discovered coordinator 1 for group GROUPID1
kafka.coordinator - INFO - Skipping heartbeat: no auto-assignment or waiting on rebalance
kafka.coordinator.consumer - ERROR - Offset commit failed: This is likely to cause duplicate message delivery
Traceback (most recent call last):
File "/path/python3.5/site-packages/kafka/coordinator/consumer.py", line 407, in _maybe_auto_commit_offsets_sync
self.commit_offsets_sync(self._subscription.all_consumed_offsets())
File "/path/python3.5/site-packages/kafka/coordinator/consumer.py", line 398, in commit_offsets_sync
raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
kafka.coordinator.consumer - INFO - Revoking previously assigned partitions {TopicPartition(topic='TOPIC1', partition=0)} for group GROUPID1

最佳答案

似乎您需要调整您的消费者配置,很可能查看日志似乎消费者的心跳 session 即将到期并且由于 session 已过期而无法提交最后轮询的记录并且它正在触发重新平衡因此它将从最后未提交的记录再次轮询

配置检查 -

  • heartbeat.interval.ms
  • session.timeout.ms
  • max.poll.interval.ms

关于python - Kafka 一遍又一遍地重放消息 - 心跳 session 已过期 - 标记协调器已死,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46497352/

66 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com