gpt4 book ai didi

apache-kafka - 卡夫卡 - org.apache.kafka.common.errors.NetworkException

转载 作者:行者123 更新时间:2023-12-04 04:43:27 25 4
gpt4 key购买 nike

我有一个 kafka 客户端代码,它连接到 Kafka(服务器 0.10.1,客户端是 0.10.2)代理。代码中有 2 个主题和 2 个不同的消费者组,还有一个生产者。偶尔从生产者代码中获取 NetworkException(2 天一次,5 天一次,......)。我们在两个消费者组的日志中看到消费者组(重新)加入信息,然后是来自生产者 future.get() 调用的 NetworkException。不知道为什么我们会收到这个错误。

代码:-

final Future<RecordMetadata> futureResponse = 
producer.send(new ProducerRecord<>("ping_topic", "ping"));
futureResponse.get();

异常(exception):-
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)

NetworkException 的 Kafka API 定义,

"A misc. network-related IOException occurred when making a request. This could be because the client's metadata is out of date and it is making a request to a node that is now dead."



谢谢

最佳答案

我在测试 Kafka Consumer 时遇到了同样的错误。我为此使用了发件人模板。
在消费者配置中,我另外设置了以下属性:

 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

发送消息后,我添加了一个线程 sleep :
   ListenableFuture<SendResult<String, String>> future =
senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);

Thread.Sleep(10000).

有必要使测试工作,但可能不适合您的情况。

关于apache-kafka - 卡夫卡 - org.apache.kafka.common.errors.NetworkException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49671604/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com