gpt4 book ai didi

java - Kafka AdminClient 尝试获取消费者的滞后时遇到问题

转载 作者:行者123 更新时间:2023-12-02 02:44:49 25 4
gpt4 key购买 nike

我一直在尝试使用 AdminClient 来获取消费者的滞后,但是 adminClient.listGroupOffsets("foo");返回一个空指针 NullPointerException。 这是我的代码:

public long getLag() {
AdminClient adminClient = AdminClient.createSimplePlaintext("localhost:9092");
scala.collection.immutable.Map<TopicPartition, Object> offsets = adminClient.listGroupOffsets("foo");
Option<Object> offset = offsets.get(new TopicPartition("test", 0));
TopicPartition topicPartition = new TopicPartition("test", 0);
return getLogEndOffset(topicPartition)- Long.parseLong(offset.get().toString());

}

private long getLogEndOffset(TopicPartition tp) {
KafkaConsumer consumer = createNewConsumer();
Collections.singletonList(tp);
consumer.assign(Collections.singletonList(tp));
consumer.seekToEnd(Collections.singletonList(tp));
return consumer.position(tp);
}

private KafkaConsumer createNewConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer(properties);
}

最佳答案

listGroupOffsets api 在 2.12 中引入。可能的原因是版本与代理版本不匹配。

代理仅支持 OffsetFetchRequest v1,但您需要 v2 或更高版本才能请求所有主题分区。

关于java - Kafka AdminClient 尝试获取消费者的滞后时遇到问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44783002/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com