gpt4 book ai didi

python - 手动提交偏移量到kafka主题的正确方法是什么

转载 作者:太空宇宙 更新时间:2023-11-03 21:17:33 33 4
gpt4 key购买 nike

我有一个消费者脚本,它处理每条消息并手动向主题提交偏移量。

CONSUMER = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_SERVER],
auto_offset_reset="earliest",
max_poll_records=100,
enable_auto_commit=False,
group_id=CONSUMER_GROUP,
# Use the RoundRobinPartition method
partition_assignment_strategy=[RoundRobinPartitionAssignor],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

while True:
count += 1
LOGGER.info("--------------Poll {0}---------".format(count))
for msg in CONSUMER:
# Process msg.value
# Commit offset to topic
tp = TopicPartition(msg.topic, msg.partition)
offsets = {tp: OffsetAndMetadata(msg.offset, None)}
CONSUMER.commit(offsets=offsets)

处理每条消息所需的时间 < 1 秒。

我收到此错误错误:

kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.


Process finished with exit code 1

期望:

a) 如何修复此错误?

b) 如何确保我的手动提交正常工作?

c) 提交偏移量的正确方法。

我已经经历过这个但是Difference between session.timeout.ms and max.poll.interval.ms for Kafka 0.10.0.0 and later versions为了理解我的问题,我们非常感谢任何有关调整轮询、 session 或心跳时间的帮助。

Apache 卡夫卡:2.11-2.1.0卡夫卡 python :1.4.4

最佳答案

消费者的

session.timeout.ms应小于Kafka代理上的group.max.session.timeout.ms

关于python - 手动提交偏移量到kafka主题的正确方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54569082/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com