gpt4 book ai didi

apache-kafka - Kafka 中读取字段 'topics' : java. nio.BufferUnderflowException 时出错

转载 作者:行者123 更新时间:2023-12-03 16:06:12 30 4
gpt4 key购买 nike

9.0 客户端使用来自远程系统上运行的两个代理的消息。我的生产者工作正常并且能够向代理发送消息,但我的消费者无法使用这些消息。消费者和生产者正在我的本地系统上运行两个经纪人都在 aws 上。
每当我尝试运行消费者时。代理日志中出现以下错误。

ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)

我的消费者代码如下>
package Kafka1.K1;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloKafkaConsumer
{
public static void main(String args[]) throws InterruptedException, ExecutionException {
String a[] = new String[]{"loader1"};
//topik.add("loader1");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put("heartbeat.interval.ms", "500");
props.put("session.timeout.ms", "1000");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "10000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(a));
while (true) {
// Poll for ConsumerRecords for a certain amount of time
ConsumerRecords<String, String> records = consumer.poll(1000);

// Process the ConsumerRecords, if any, that came back
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.println(key+":"+value);
// Do something with message
}
}

}
}

有人可以帮忙吗?

最佳答案

当您的机器上运行的 kafka 集群是旧版本(即 0.8.x.x)而用于从集群访问数据的客户端是更高版本(即 0.9.x.x)时,会发生此问题。

根据需求有两种简单的解决方案:

  • 降级客户端版本。
  • 升级kafka集群。
  • 关于apache-kafka - Kafka 中读取字段 'topics' : java. nio.BufferUnderflowException 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37564784/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com