gpt4 book ai didi

java - Spark + Kafka 流 NoClassDefFoundError kafka/serializer/StringDecoder

转载 作者:行者123 更新时间:2023-11-30 07:56:08 25 4
gpt4 key购买 nike

我正在尝试从我的 kafka 生产者发送消息并在 spark streaming 中进行流式传输。但是当我在 spark submit 上运行我的应用程序时出现以下错误。

错误

 Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder
at com.spark_stream.Main.main(Main.java:37)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 10 more

应用代码如下:

主.java

package com.spark_stream;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;

public class Main {

public static void main(String[] args) {
// TODO Auto-generated method stub

System.out.println( "spark started!" );

SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));


Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("speed");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});

System.out.println( "connection completed" );


ssc.start();

ssc.awaitTermination();

System.out.println( "spark ended!" );

}

}

Pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.spark_stream</groupId>
<artifactId>com.spark_stream</artifactId>
<version>0.0.1-SNAPSHOT</version>


<dependencies>

<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.0</version>
</dependency>


</dependencies>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
</project>

找不到此错误的解决方案。任何帮助将不胜感激。

最佳答案

查看文档:http://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit

更具体的部分:

Path to a bundled jar including your application and all dependencies.

而您的 pom.xml 清楚地表明您正在构建的 jar 没有依赖项。这就是为什么 spark-submit 找不到类 kafka.serializer.StringDecoder。

您可能想使用一个插件来解决此类问题,该插件将您的依赖项包含在您的 jar 中,maven assembly插件可以帮你解决这个问题

关于java - Spark + Kafka 流 NoClassDefFoundError kafka/serializer/StringDecoder,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42566479/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com