gpt4 book ai didi

java - Spring Boot : Kafka health indicator

转载 作者:行者123 更新时间:2023-12-04 08:34:24 26 4
gpt4 key购买 nike

我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查健康状况(不仅检查套接字连接)。我知道 Kafka 有像 KafkaHealthIndicator 这样的开箱即用的东西,有人有使用它的经验或例子吗?

   public class KafkaHealthIndicator implements HealthIndicator {
private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

private KafkaTemplate<String, String> kafka;

public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
this.kafka = kafka;
}

@Override
public Health health() {
try {
kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Health.down(e).build();
}
return Health.up().build();
}
}

最佳答案

kafkaAdminClient.describeCluster(..)是测试 Kafka 可用性的地方。

@Configuration
public class KafkaConfig {

@Autowired
private KafkaAdmin kafkaAdmin;

@Bean
public AdminClient kafkaAdminClient() {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}

@Bean
public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
final DescribeClusterOptions options = new DescribeClusterOptions()
.timeoutMs(1000);

return new AbstractHealthIndicator() {
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
// When Kafka is not connected, describeCluster() method throws
// an exception which in turn sets this indicator as being DOWN.
kafkaAdminClient.describeCluster(options);

builder.up().build();
}
};
}

}
对于更详细的探测添加:
DescribeClusterResult clusterDesc = kafkaAdminClient.describeCluster(options);
builder.up()
.withDetail("clusterId", clusterDesc.clusterId().get())
.withDetail("nodeCount", clusterDesc.nodes().get().size())
.build();

关于java - Spring Boot : Kafka health indicator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64869177/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com