gpt4 book ai didi

java - 无法从所有分区获取 Kafka 滞后

转载 作者:行者123 更新时间:2023-11-30 05:18:17 25 4
gpt4 key购买 nike

有没有办法找到分配给同一消费者组的所有消费者的整个 kafka 滞后?

我只能获取指定分区的延迟。例如假设只有一个分区分配给一个消费者,下面的代码只会给该分区带来延迟。不适用于其他分区。

Set<TopicPartition> partitionSet = consumer.assignment();
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitionSet);
for(TopicPartition tp : partitionSet) {
LOG.info("Topic:{}, EndOffset:{}, currentOffset:{}, LAG:{}",
tp.topic(), endOffsets.get(tp), consumer.position(tp), endOffsets.get(tp)-consumer.position(tp));
}

基本上,想要找到所有分区的滞后总和,以了解某个主题的所有消费者(同一组)滞后了多少。

此外,是否有任何类似于 kafka-consumer-groups 的可用 api,并传递 bootstrap-server 和 group 作为参数来查找滞后?

./kafka-consumer-groups.sh --bootstrap-server --group --describe

最佳答案

以编程方式实现此目的的正确方法是使用 AdminClient API:

  1. 使用 listConsumerGroupOffsets() 获取该组的提交偏移量.

  2. 获取日志结束偏移量。目前您需要启动一个 Consumer 并调用 endOffsets()对于步骤 1 中检索到的所有分区。

    在 Kafka 2.5(预计 2020 年 2 月结束)中,有一个新的 AdminClient API 用于检索日志结束偏移量 listOffsets(),这样就可以单独使用 AdminClient 来检索延迟。

  3. 对于每个分区,从日志结束偏移量(步骤 2)中减去提交的偏移量(步骤 1)。

这基本上就是kafka-consumer-groups.sh在幕后所做的事情。所以检查implementation of this tool如果你愿意的话。

关于java - 无法从所有分区获取 Kafka 滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59988199/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com