gpt4 book ai didi

java - Kafka 新消费者 : (re)set and commit offsets with using assign, 未订阅

转载 作者:行者123 更新时间:2023-11-30 08:02:50 25 4
gpt4 key购买 nike

使用新的 Kafka Java 消费者 API,我运行一个消费者来消费消息。当所有可用消息都被消耗后,我使用 kill -15 将其终止。

现在我想重置偏移量以开始。我想避免只使用不同的消费者群体。我尝试的是以下调用序列,使用与刚读完数据的消费者相同的组。

assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));

我以为我已经在测试中成功了,但现在我总是得到:

ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)

assigncommitSync 组合原则上是否错误,可能是因为只有 subscribecommitSync 一起使用?文档只说 assignsubscribe 不一致,但我认为这仅适用于一个消费者进程。 (事实上​​ ,我什至希望在另一个消费者启动时运行偏移量重置消费者,希望另一个消费者可能会注意到偏移量变化并重新开始。但先关闭它也可以。)

有什么想法吗?

最佳答案

发现问题。如果我们尊重以下条件,我的问题中描述的方法效果很好:

  1. 可能没有其他消费者使用目标 group.id 运行。即使消费者仅订阅了其他主题,这也会阻碍在调用 assign() 而不是 subscribe() 之后提交主题偏移量。

  2. 在最后一个其他消费者停止后,需要30秒(我认为是group.max.session.timeout.ms)才能操作成功。来自kafka的指示性日志消息是

    Group X generation Y is dead and removed

一旦这出现在日志中,序列

assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));

可以成功。

关于java - Kafka 新消费者 : (re)set and commit offsets with using assign, 未订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36579827/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com