gpt4 book ai didi

java - Kafka生产者无法向服务器发送数据

转载 作者:太空宇宙 更新时间:2023-11-04 11:56:26 26 4
gpt4 key购买 nike

这是我的代码。我能够创建主题,但由于某种原因无法在主题内发送数据。很长一段时间后我收到这些错误。我正在使用kafka版本2.11-0.8.2.1

org.apache.kafka.clients.producer.KafkaProducer$FutureFailure@5474c6c
org.apache.kafka.clients.producer.KafkaProducer$FutureFailure@4b6995df

这是kafka的server.log文件

[2016-12-27 21:05:54,873] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(Unknown Source)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source)
at sun.nio.ch.IOUtil.read(Unknown Source)
at sun.nio.ch.SocketChannelImpl.read(Unknown Source)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Unknown Source)
[2016-12-27 21:07:54,727] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2016-12-27 21:16:08,559] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

这是我将整数发送到kafka系统的java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("timeout.ms", "50");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 2; i++)
System.out.println(producer.send(new ProducerRecord<String, String>("testtopic", Integer.toString(i),
Integer.toString(i))).toString());

producer.close();

这是 pom.xml

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

最佳答案

没有什么特别突出的

props.put("timeout.ms", "50");

请求超时应该大于默认轮询间隔,在 Kafka 中默认轮询间隔为 5 分钟。所以我想如果将其保留为默认值(略高于 5 分钟),它应该可以工作。

关于java - Kafka生产者无法向服务器发送数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41354526/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com