gpt4 book ai didi

java - Spring Kafka Consumer - 打印 Kafka 滞后信息

转载 作者:行者123 更新时间:2023-11-29 04:25:39 34 4
gpt4 key购买 nike

我创建了一个读取主题的 spring kafka 消费者。有没有办法像我们打印分区信息那样打印滞后信息?

最佳答案

虽然没有提供源代码,但我假设您通过 @KafkaListener 注释实现了您的消费者。我已经克服了您使用 org.apache.kafka.clients.consumer.Consumer 接口(interface)描述的相同问题,如 here 所述.它可以在 @KafkaListener 注释下声明为消费者方法中的参数。此接口(interface)提供 metrics() 方法,该方法包含存储在 records-max-lag 属性中的消费者滞后信息。

private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);

@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
Consumer<?, ?> consumer) {



String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
.map(Metric::metricValue).map(Object::toString).distinct()
.collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));


LOGGER.info(lag);


}

在这种情况下,我明确选择了 records-lag-max 属性。您可以选择任何其他消费者指标,列表位于 Confluent Docs .

上面的代码片段将有以下输出:[Kafka 当前消费者滞后] X 条记录其中 X 是此窗口中任何分区的记录数方面的最大滞后。

重要:

我正在使用 Spring Kafka 库的 2.3.3.RELEASE 版本

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>

关于java - Spring Kafka Consumer - 打印 Kafka 滞后信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46492083/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com