gpt4 book ai didi

scala - Spark 流 + kafka - Spark session API

转载 作者:行者123 更新时间:2023-12-04 23:16:18 24 4
gpt4 key购买 nike

感谢您帮助使用 spark 2.0.2 运行 spark 流程序。

运行错误 "java.lang.ClassNotFoundException: Failed to find data source: kafka" .修改后的 POM 文件如下。

正在创建 Spark,但在调用来自 kafka 的负载时出错。

创建的 Spark session :

 val spark = SparkSession
.builder()
.master(master)
.appName("Apache Log Analyzer Streaming from Kafka")
.config("hive.metastore.warehouse.dir", hiveWarehouse)
.config("fs.defaultFS", hdfs_FS)
.enableHiveSupport()
.getOrCreate()

创建 kafka 流:
    val logLinesDStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:2181")
.option("subscribe", topics)
.load()

错误信息:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org

POM.XML:
    <scala.version>2.10.4</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<spark.version>2.0.2</spark.version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
</dependencies>

最佳答案

当您实际需要 2.0.2 时,您正在引用 Spark 的 v1.5.1 对 Kafka 的引用。您还需要使用 sql-kafka对于结构化流:

<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.10</artifactId>
<version>2.0.2</version>

请注意,SparkSession API 仅支持 Kafka >= 0.10

关于scala - Spark 流 + kafka - Spark session API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41100579/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com