gpt4 book ai didi

java - Kafka - 停止重试 ConnectException

转载 作者:搜寻专家 更新时间:2023-10-31 20:31:49 25 4
gpt4 key购买 nike

我有一个像这样定义的kafka制作人

public KafkaMessageProducer(String kafkaHost, String kafkaPort, Map<String, String> map) {
this.kafkaTopic = map;
final Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);
producer = new KafkaProducer<String, String>(properties);
}

我正在使用以下代码发送消息。 (也尝试使用回调)。

public void sendMessage(String topic, RestCommonResource resultToken) {

ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.valueToTree(resultToken);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, jsonNode.toString());
producer.send(record);

}

但是如果 kafka 服务器宕机并且生产者发布消息,程序将陷入无限循环并出现以下异常:

WARN  [2018-09-13 06:27:59,589] org.apache.kafka.common.network.Selector: Error in I/O with localhost/127.0.0.1
! java.net.ConnectException: Connection refused: no further information
! at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_80]
! at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) ~[na:1.7.0_80]
! at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
! at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]

如果有任何属性可以设置为停止重试并丢弃消息。

最佳答案

Currently if a Kafka client loses a connection with brokers it will wait for reconnect.backoff.ms milliseconds before attempting to reconnect.

While this strategy works well when a client is disconnected for a short time if a single broker or the entire cluster become unavailable for a long time all clients will quickly generate a lot of connections.

In addition, developers have limited control over a client which constantly loses its connections with the cluster.

我认为这个主题对您有用:Add custom policies for reconnect attempts to NetworkdClient

reconnect.backoff.ms : The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

reconnect.backoff.max.ms : The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

关于java - Kafka - 停止重试 ConnectException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52307867/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com