gpt4 book ai didi

java - Spark DirectStream 问题

转载 作者:太空宇宙 更新时间:2023-11-04 11:06:11 25 4
gpt4 key购买 nike

我正在尝试从 Kafka 创建 Spark Direct Stream,但在创建 directStream 对象时出现错误:

kafkaUtils 类型中的 createDirectStream 方法不适用于(我传递的 HashMap 参数之一)。

在这一行:JavaPairInputDStream directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class, String.class、StringDecoder.class、StringDecoder.class、kafkaParams、主题);

完整代码:

package kafkatest2;



import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.commons.codec.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kafka010.*;
public class SparkStream {

public static void main(String[] args) {

SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

// TODO: processing pipeline
Map<String,String> kafkaParams = new HashMap<String,String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");

Set<String> topics = Collections.singleton("topic5");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);


directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
ssc.start();
ssc.awaitTermination();
}
}

最佳答案

在您的代码中,使用了错误的StringDecoder。它应该是 kafka.serializer.StringDecoder 而不是 org.apache.commons.codec.StringDecoder。

正确的代码如下:

package kafkatest2;



import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kafka010.*;
public class SparkStream {

public static void main(String[] args) {

SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

// TODO: processing pipeline
Map<String,String> kafkaParams = new HashMap<String,String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");

Set<String> topics = Collections.singleton("topic5");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);


directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
ssc.start();
ssc.awaitTermination();
}
}

希望对您有帮助!

关于java - Spark DirectStream 问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46466471/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com