gpt4 book ai didi

spring-boot - Spring Boot Kafka - Kafka 指标在/actuator/prometheus 中不可用

转载 作者:行者123 更新时间:2023-12-02 02:19:02 26 4
gpt4 key购买 nike

我想监控 Kafka 指标,但不幸的是,/actuator/prometheus 端点下没有与 Kafka 相关的内容。我的设置中是否缺少任何内容?

应用程序依赖项:Kotlin 1.4.31、Spring Boot 2.3.9、Spring Kafka 2.6.7、Reactor Kafka 1.2.5、Kafka Clients 2.5.1

应用配置:

    management:   
server:
port: 8081
endpoints:
web:
exposure:
include: health,info,metrics,prometheus

spring:
jmx:
enabled: true
kafka:
bootstrap-servers: ...
consumer:
group-id: my-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual
ssl:
key-store-location: ...
key-store-password: ...
security:
protocol: SSL

我的接收器看起来像:

    @Bean
fun someEventReceiver(): SomeEventReceiver =
KafkaReceiver.create(
ReceiverOptions.create<String, SomeEvent>(kafkaProperties.buildConsumerProperties())
.withValueDeserializer(SomeEvenDeserializer())
.subscription(listOf(serviceProperties.kafka.topics.someevent))
)

和听众:

    @EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
someEventReceiver
.receive()
.groupBy { it.receiverOffset().topicPartition() }
.publishOn(Schedulers.boundedElastic())
.flatMap { someEvent ->
someEvent
.publishOn(Schedulers.boundedElastic())
.delayUntil(::handleEvent)
.doOnNext { it.receiverOffset().acknowledge() }
.retryWhen(Retry.backoff(10, Duration.ofMillis(100)))
}
.retryWhen(Retry.indefinitely())
.subscribe()
}

最佳答案

spring-kafka 不同,reactor-kafka 目前没有任何 Micrometer 集成。

如果类路径上也有 spring-kafka,则可以利用其 MicrometerConsumerListenerKafkaClientMetrics 绑定(bind)到仪表注册表(或者您可以进行注册绑定(bind)你自己)。

这是一个使用 Spring 监听器的示例:

@SpringBootApplication
public class So66706766Application {

public static void main(String[] args) {
SpringApplication.run(So66706766Application.class, args);
}

@Bean
ApplicationRunner runner(MicrometerConsumerListener<String, String> consumerListener) {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String>create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "so66706766"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.subscription(Collections.singletonList("so66706766"));
KafkaReceiver<String, String> receiver = KafkaReceiver.create(ro);
receiver.receive()
.doOnNext(rec -> {
System.out.println(rec.value());
rec.receiverOffset().acknowledge();
})
.subscribe();
receiver.doOnConsumer(consumer -> {
consumerListener.consumerAdded("myConsumer", consumer);
return Mono.empty();
}).subscribe();
};
}

@Bean
MicrometerConsumerListener<String, String> consumerListener(MeterRegistry registry) {
return new MicrometerConsumerListener<>(registry);
}

@Bean
NewTopic topic() {
return TopicBuilder.name("so66706766").partitions(1).replicas(1).build();
}

}

# HELP kafka_consumer_successful_authentication_total The total number of connections with successful authentication
# TYPE kafka_consumer_successful_authentication_total counter
kafka_consumer_successful_authentication_total{client_id="consumer-so66706766-1",kafka_version="2.6.0",spring_id="myConsumer",} 0.0
# HELP jvm_gc_live_data_size_bytes Size of long-lived heap memory pool after reclamation
# TYPE jvm_gc_live_data_size_bytes gauge
jvm_gc_live_data_size_bytes 0.0
# HELP kafka_consumer_connection_creation_rate The number of new connections established per second
# TYPE kafka_consumer_connection_creation_rate gauge
kafka_consumer_connection_creation_rate{client_id="consumer-so66706766-1",kafka_version="2.6.0",spring_id="myConsumer",} 0.07456936193482637
...

我添加了一个问题:https://github.com/reactor/reactor-kafka/issues/206

关于spring-boot - Spring Boot Kafka - Kafka 指标在/actuator/prometheus 中不可用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66706766/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com