gpt4 book ai didi

java - 运行 Apache Spark Kafka Stream 时获取 Hadoop OutputFormat RunTimeException

转载 作者:可可西里 更新时间:2023-11-01 15:02:06 25 4
gpt4 key购买 nike

我正在运行一个程序,该程序使用 Apache Spark 从 Apache Kafka 集群获取数据并将数据放入 Hadoop 文件中。我的程序如下:

public final class SparkKafkaConsumer {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = "Topic1, Topic2, Topic3".split(",");
for (String topic: topics) {
topicMap.put(topic, 3);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, "kafka.test.com:2181", "NameConsumer", topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Lists.newArrayList(",".split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt");
jssc.start();
jssc.awaitTermination();
}
}

我正在使用此命令提交应用程序:C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming- kafka_2.10:1.6.2 --class "SparkKafkaConsumer"--master local[4] target\simple-project-1.0.jar

我收到此错误:java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.setClass(Configuration. java:2148)

是什么导致了这个错误,我该如何解决?

最佳答案

我同意该错误并不令人回味,但通常最好在任何 saveAsHadoopFile 方法中指定要输出的数据格式,以保护您自己免受此类异常的影响.

这是文档中特定方法的原型(prototype):

saveAsHadoopFiles(java.lang.String prefix, java.lang.String suffix, java.lang.Class<?> keyClass, java.lang.Class<?> valueClass, java.lang.Class<F> outputFormatClass)

在您的示例中,这将对应于:

wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class)

根据你的wordCounts PairDStream的格式,我选择了Text,因为key是String类型,IntWritable 因为与键关联的值是 Integer 类型。

如果您只需要基本的纯文本文件,请使用 TextOutputFormat,但您可以查看 FileOutputFormat 的子类以获得更多输出选项。

同样有人问,Text 类来自 org.apache.hadoop.io 包,TextOutputFormat 来自org.apache.hadoop.mapred 包。

关于java - 运行 Apache Spark Kafka Stream 时获取 Hadoop OutputFormat RunTimeException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38503502/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com