gpt4 book ai didi

java - KafkaConsumer 在 KafkaServer 上出错(版本 0.9.0.1)

转载 作者:太空宇宙 更新时间:2023-11-04 12:48:24 25 4
gpt4 key购买 nike

我正在尝试使用 Kafka-Client 库(0.9.0.1)测试生产者、消费者。代理(0.9.0.1)正在服务器上运行,我已经测试了KafkaProducer,没有问题。但是当我测试 KafkaConsumer 进行轮询时,代理会发出错误消息。

[2016-03-18 13:44:19,129] ERROR Closing socket for /172.26.132.149 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 10 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:57) at kafka.network.RequestChannel$Request.(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245)

消费者测试代码如下。

class ConsumerRunner implements Runnable{
private KafkaConsumer<String,String> consumer;
private String topic;
public ConsumerRunner(String topic,Properties props){
consumer = new KafkaConsumer<String,String>(props);
this.topic = topic;
consumer.subscribe(Arrays.asList(this.topic));
}
public void run() {
while(true){
ConsumerRecords<String,String> records = consumer.poll(10000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}

}

我猜测轮询请求包含错误的请求类型键,但是当我检查Kafka核心源时,我意识到请求类型键“10”被定义为“GroupCoordinatorKey”。我在“kafka.network.RequestChannel.scala”中发现可疑代码

   val requestObj =
if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
RequestKeys.deserializerForKey(requestId)(buffer)

else
null

测试消费者也显示错误消息

java.io.EOFException: null at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:180) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) at com.medialog.mdt.kafka.KafkaTest$ConsumerThread.run(KafkaTest.java:61)

有人有想法吗?是我的问题吗?或者其他?请帮我。谢谢。

最佳答案

明科

不确定您是否只是想为 0.9 Kafka 代码创建一个消费者,或者您的 kafka 消息是否有导致此问题的特定内容,您能否分享更多详细信息。

但是如果您只是想为 0.9 编写一个 Kafka 消费者,那么在 Kafka 0.9 中会有新的消费者 API。如果您愿意使用新的消费者 API,请查看此示例 https://github.com/sdpatil/KafkaAPIClient/blob/master/src/main/java/com/spnotes/kafka/simple/Consumer.java示例。

苏尼尔

关于java - KafkaConsumer 在 KafkaServer 上出错(版本 0.9.0.1),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36077492/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com