gpt4 book ai didi

Kafka 中的 java.io.NotSerializedException : org. apache.kafka.clients.consumer.ConsumerRecord

转载 作者:行者123 更新时间:2023-12-02 09:31:10 25 4
gpt4 key购买 nike

我是 Apache Kafka 的初学者。以下代码示例适用于我的 Kafka 生产者和消费者。

Kafka 生产者代码:

public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
String inputTopic = "inputTopic";
String broker = "localhost:9092";

Properties properties = new Properties();
properties.put("bootstrap.servers", broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
String message = " >>>>>>>>>>>>>>>>>>>>> Data Message";
int key = 0;

while(key < 1) {
key = key + 1;

ProducerRecord<String, String> record = new ProducerRecord<String, String>(inputTopic, String.valueOf(key), (message + " " + key));
producer.send(record).get();
}

producer.close();

接下来的代码是关于 Kafka 消费者的

Kafka消费者代码:

public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SimpleDStreamExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

Collection<String> topics = Arrays.asList("inputTopic");

Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-demo");
kafkaParams.put("kafka.consumer.id", "kafka-consumer-01");

kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(1000));

JavaInputDStream<ConsumerRecord<String, String>> inputStream = KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));


inputStream.print();
ssc.start();
ssc.awaitTermination();
ssc.close();
}

但是 Kafka 消费者抛出以下异常,

19/09/16 20:57:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = >>>>>>>>>>>>>>>>>>>>> Data Message 1))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
19/09/16 20:57:58 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = >>>>>>>>>>>>>>>>>>>>> Data Message 1))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2); not retrying
19/09/16 20:57:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

我无法理解这些异常的原因。我确认 Kafka 生产者上的 <String,String> 类型消息。但为什么消费者消息不可序列化呢?

最佳答案

如果我们将您的代码与文档中给出的示例进行比较,您没有从 ConsumerRecord 中提取任何数据,该数据不可序列化,因此无法收集和打印其数据

JavaPairRDD<String, String> outStream = 
inputStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
outStream.print();

关于Kafka 中的 java.io.NotSerializedException : org. apache.kafka.clients.consumer.ConsumerRecord,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57956511/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com