gpt4 book ai didi

java - 如何在不解析日志文件的情况下获取Kafka连接状态?能够对 "Connection to node terminated during authentication."采取行动

转载 作者:行者123 更新时间:2023-11-30 10:15:00 26 4
gpt4 key购买 nike

当使用不正确的凭据进行身份验证时,我在身份验证期间收到以下预期消息。

[Consumer clientId=consumer-1, groupId=xxx] Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

这很好,但现在我只能在日志文件中找到此信息,而我想对此消息采取措施。

我还查看了此消息源自的 Kafka 源代码:

(在 1.1 中)https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java(或在后备箱中)https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java )

有一段代码是这样的:

switch (disconnectState.state()) {
case AUTHENTICATION_FAILED:
connectionStates.authenticationFailed(nodeId, now, disconnectState.exception());
log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage());
break;
case AUTHENTICATE:
// This warning applies to older brokers which dont provide feedback on authentication failures
log.warn("Connection to node {} terminated during authentication. This may indicate " +
"that authentication failed due to invalid credentials.", nodeId);
break;

重要说明:我使用基于 SASL_PLAIN 的自定义模块重写了 Kafka 中的身份验证机制,当我只是在调试时,我意识到在身份验证期间此模块中的其他内容失败,导致与此不同的异常模块,这意味着它触发了这个“老经纪人”代码路径。但是,我还是想对这个状态采取一些行动。

我试过像这样创建一个监听器:

    final ContainerProperties containerProperties = new ContainerProperties("TEST_TOPIC");
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
containerProperties.setMessageListener((AcknowledgingMessageListener<GenericRecord, GenericRecord>) (record, ack) -> logger.info("Got record on test topic"));
KafkaMessageListenerContainer<GenericRecord, GenericRecord> container = new KafkaMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);
container.start();

但代码并没有抛出任何身份验证异常,它只是在日志中不断打印上述消息。

我也试过创建一个简单的消费者,像这样:

    Consumer<GenericRecord, GenericRecord> consumer = kafkaConsumerFactory.createConsumer();
consumer.subscribe(Collections.singleton("BX_TEST_TOPIC"));
try {
ConsumerRecords<GenericRecord, GenericRecord> poll = consumer.poll(10);
}
catch (Exception e) {
logger.info("Exception during polling", e);
}

但是这段代码也没有抛出任何异常,它也只是一直在日志中打印消息。尽管根据 https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long- poll 方法可能会抛出 AuthenticationException。这可能是因为“旧代理”类型的消息,在源代码中仅在日志中显示警告。

那么,如果服务器没有返回身份验证异常但仍然无法登录,如何实际捕获任何 AuthenticationException 或以某种有意义的方式获取连接状态而不是解析日志?

请注意,当身份验证不是问题但其他原因失败时,会出现完全相同的问题。例如,有时我的 Kafka 集群无法正确启动,在这种情况下我会收到以下消息:

Connection to node -1 could not be established. Broker may not be available.

我也无法检查。相反,当客户端进入某个无限循环尝试连接时,日志只会不断填满此消息。

最佳答案

您可以通过增加重试退避来减少循环。

您可以使用类似...的方式检查代理的状态

spring.kafka.producer.properties.retry.backoff.ms=1000
spring.kafka.producer.properties.max.block.ms=10000
spring.kafka.bootstrap-servers=localhost:9096

@Bean
public ApplicationRunner runner(ProducerFactory<?, ?> producerFactory) {
return args -> {
try (Producer<?, ?> producer = producerFactory.createProducer()) {
producer.partitionsFor("foo");
}
catch (Exception e) {
e.printStackTrace();
}
};
}

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.

我减少了 max.block.ms,因为它默认为 60 秒。

关于java - 如何在不解析日志文件的情况下获取Kafka连接状态?能够对 "Connection to node terminated during authentication."采取行动,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50674491/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com