gpt4 book ai didi

apache-kafka - 如何删除一个特定主题的组的消费者偏移量

转载 作者:行者123 更新时间:2023-12-05 04:55:44 26 4
gpt4 key购买 nike

假设我有两个主题(都有两个分区和无限保留):

  • my_topic_a
  • my_topic_b

和一个消费者组:

  • 我的消费者

在某些时候,它同时消耗了两个主题,但由于一些变化,它不再对 my_topic_a 感兴趣,所以它停止消耗它并且现在正在累积滞后:

kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
my_topic_a 0 300000 400000 100000 - - -
my_topic_a 1 300000 400000 100000 - - -
my_topic_b 0 500000 500000 0 - - -
my_topic_b 1 500000 500000 0 - - -

这种延迟让我很烦,因为:

  • 我在 Grafana 中的消费者滞后图被污染了。
  • 触发了自动警报,提醒我消费者滞后太多。

因此我想摆脱 my_consumermy_topic_a 的偏移量,以达到 my_consumer 从未消费过的状态 my_topic_a

以下尝试失败:

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user

有了这个输出:

The consumer does not support topic-specific offset deletion from a consumer group.

我怎样才能实现我的目标? (暂时停止该组的所有消费者在我的用例中是一个可行的选择。)

(我使用的是 Kafka 版本 2.2.0。)


我的猜测是,可以通过为主题 __consumer_offsets 写一些东西来完成一些事情,但我不知道它会是什么。目前,该主题如下所示(再次简化):

kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)

最佳答案

与此同时(Kafka 2.8),kafka-consumer-groups.sh 的新--delete-offsets 参数已成为可能。 :-)

关于apache-kafka - 如何删除一个特定主题的组的消费者偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65239579/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com