recordClass"是什么?-6ren"> recordClass"是什么?-我正在尝试使用以下 API 通过 Kafka 进行 Spark 流处理。我必须使用 Spark 传输 avro 序列化数据,数据位于 Kafka 中。 static ,VD extends kafka-6ren">
gpt4 book ai didi

java - JavaInputDStream中的 "Class recordClass"是什么?

转载 作者:行者123 更新时间:2023-12-01 18:02:16 24 4
gpt4 key购买 nike

我正在尝试使用以下 API 通过 Kafka 进行 Spark 流处理。我必须使用 Spark 传输 avro 序列化数据,数据位于 Kafka 中。

static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> 
JavaInputDStream<R> createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
:: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver.

我可以知道我需要为 API 中的参数类 recordClass 提供什么吗?我已经使用了如下所示的 API,但它给出了编译错误。

我想要的只是从 kafka 接收字节流数据到 Spark Streaming 中。

JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
(Function<MessageAndMetadata<String, String>, String>) MessageAndMetadata::message);

Exception in thread "main" java.lang.Error: Unresolved compilation problems: The method createDirectStream(JavaStreamingContext, Class, Class, Class, Class, Class, Map, Map, Function,R>) in the type KafkaUtils is not applicable for the arguments (JavaStreamingContext, Class, Class, Class, Class, Class, Map, Map, Function,String>)

最佳答案

试试这个。

 JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, fromOffset,
(Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);

Here是一篇针对 Kafka、Avro 和 Spark 的文章。

关于java - JavaInputDStream中的 "Class<R> recordClass"是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39967026/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com