gpt4 book ai didi

java - 如何进行最高效的 Kafka 健康检查控制?

转载 作者:太空宇宙 更新时间:2023-11-04 09:29:13 29 4
gpt4 key购买 nike

我使用spring kafka。我想确定 Kafka 已关闭。 kafka有这样的实现吗?我找到了各种解决方案。但我发现的所有解决方案都必须编写计划作业。有没有一种方法可以显示kafka已关闭而无需编写计划作业?

我能找到的解决方案:

由于

中公开的原因,此解决方案已恢复

https://github.com/spring-projects/spring-boot/issues/12225

https://github.com/spring-projects/spring-boot/issues/12222

https://github.com/spring-projects/spring-boot/pull/12200

所以,我不想使用这个解决方案。

@Configuration
public class KafkaConfig {

@Autowired
private KafkaAdmin admin;

@Autowired
private MeterRegistry meterRegistry;

@Autowired
private Map<String, KafkaTemplate<?, ?>> kafkaTemplates;

@Bean
public AdminClient kafkaAdminClient() {
return AdminClient.create(admin.getConfig());
}

@SuppressWarnings("deprecation") // Can be avoided by relying on Double.NaN for non doubles.
@PostConstruct
private void initMetrics() {
final String kafkaPrefix = "kafka.";
for (Entry<String, KafkaTemplate<?, ?>> templateEntry : kafkaTemplates.entrySet()) {
final String name = templateEntry.getKey();
final KafkaTemplate<?, ?> kafkaTemplate = templateEntry.getValue();
for (Metric metric : kafkaTemplate.metrics().values()) {
final MetricName metricName = metric.metricName();
final Builder<Metric> gaugeBuilder = Gauge
.builder(kafkaPrefix + metricName.name(), metric, Metric::value) // <-- Here
.description(metricName.description());
for (Entry<String, String> tagEntry : metricName.tags().entrySet()) {
gaugeBuilder.tag(kafkaPrefix + tagEntry.getKey(), tagEntry.getValue());
}
gaugeBuilder.tag("bean", name);
gaugeBuilder.register(meterRegistry);
}
}
}

@Bean
public HealthIndicator kafkaHealthIndicator() {
final DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(1000);
final AdminClient adminClient = kafkaAdminClient();
return () -> {
final DescribeClusterResult describeCluster = adminClient.describeCluster(describeClusterOptions);
try {
final String clusterId = describeCluster.clusterId().get();
final int nodeCount = describeCluster.nodes().get().size();
return Health.up()
.withDetail("clusterId", clusterId)
.withDetail("nodeCount", nodeCount)
.build();
} catch (InterruptedException | ExecutionException e) {
return Health.down()
.withException(e)
.build();
}
};

}
}

我发现的另一个解决方案:

public class KafkaHealthIndicator implements HealthIndicator {
private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

private KafkaTemplate<String, String> kafka;

public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
this.kafka = kafka;
}

/**
* Return an indication of health.
*
* @return the health for
*/
@Override
public Health health() {
try {
kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Health.down(e).build();
}
return Health.up().build();
}
}

在两个解决方案中,我必须使用计划作业。有没有一种方法可以显示kafka已关闭而无需编写预定作业?如果没有可用的方法,第二个解决方案足以显示kafka已关闭?

最佳答案

据我所知,所有 Spring 健康检查都是计划检查。

我信任您在其他项目中放入的第一个代码块,因为它依赖于健康的 Kafka Controller 来返回 Activity 代理列表。我建议的一项补充是在类中添加一个字段,用于表示您希望成为 nodeCount

一部分的代理的最小数量。

关于java - 如何进行最高效的 Kafka 健康检查控制?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57294453/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com