gpt4 book ai didi

scala - 无法将Spark数据帧发送到Kafka(java.lang.ClassNotFoundException:无法找到数据源:kafka。)

转载 作者:行者123 更新时间:2023-12-03 05:42:10 30 4
gpt4 key购买 nike

我在使用Spark数据框架将数据推送到Kafka时遇到问题。

让我用示例示例详细解释我的方案。我想将数据加载到spark并将spark输出发送到kafka。我正在使用Gradle 3.5和Spark 2.3.1&Kafka 1.0.1

这是build.gradle

buildscript {
ext {
springBootVersion = '1.5.15.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}

apply plugin: 'scala'
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

group = 'com.sample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile('org.springframework.boot:spring-boot-starter')
compile ('org.apache.spark:spark-core_2.11:2.3.1')
compile ('org.apache.spark:spark-sql_2.11:2.3.1')
compile ('org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.1')
compile ('org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1')

testCompile('org.springframework.boot:spring-boot-starter-test')
}

这是我的代码:
package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

object SparkConnection {

case class emp(empid:Integer, empname:String, empsal:Float)

def main(args:Array[String]) {

val sparkConf = new SparkConf().setAppName("Spark
Connection").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val dataRdd = sc.textFile("/home/sample/data/sample.txt")
val mapRdd = dataRdd.map(row => row.split(","))
val empRdd = mapRdd.map( row => emp(row(0).toInt, row(1), row(2).toFloat))

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val empDF = empRdd.toDF()

empDF.
select(to_json(struct(empDF.columns.map(column):_*)).alias("value"))
.write.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "my-kafka-topic").save()

}

}

请忽略build.gradle中的spring boot framework API。

使用Gradle构建软件包后,我可以看到.gradle文件中提到的所有依赖类。

但是当我使用spark-submit运行代码时
spark-submit --class com.sample.SparkConnection spark_kafka_integration.jar

我收到以下错误
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
at com.iniste.SparkConnection$.main(SparkConnection.scala:29)
at com.iniste.SparkConnection.main(SparkConnection.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
... 13 more
2018-09-05 17:41:17 INFO SparkContext:54 - Invoking stop() from shutdown hook
2018-09-05 17:41:17 INFO AbstractConnector:318 - Stopped Spark@51684e4a{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-09-05 17:41:17 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2018-09-05 17:41:17 INFO MemoryStore:54 - MemoryStore cleared
2018-09-05 17:41:17 INFO BlockManager:54 - BlockManager stopped
2018-09-05 17:41:17 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2018-09-05 17:41:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2018-09-05 17:41:17 INFO SparkContext:54 - Successfully stopped SparkContext
2018-09-05 17:41:17 INFO ShutdownHookManager:54 - Shutdown hook called
2018-09-05 17:41:17 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-bd4cb4ef-3883-4c26-a93f-f355b13ef306
2018-09-05 17:41:17 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-156dfdbd-cff4-4c70-943f-35ef403a01ed

请帮助我摆脱这个错误。在某些博客中,他们建议我将--packages选项与spark-submit一起使用。但是我有一些代理限制,需要下载上述软件包。但是我无法理解为什么spark-submit不能获取已经可用的jar。请纠正我在哪里做错了。

最佳答案

与任何Spark应用程序一样,spark-submit用于启动您的应用程序。 spark-sql-kafka-0-10_2.11及其依赖项可以使用--packages直接添加到spark-submit中,例如下面的情况

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 com.sample.SparkConnection spark_kafka_integration.jar

可以找到 here

但是,根据cricket_007的建议,我已将shadowjar添加到您的build.gradle中
因此,新的外观可能与此类似。
buildscript {
ext {
springBootVersion = '1.5.15.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}

plugins {
id "com.github.johnrengelman.shadow" version "2.0.4"
}
apply plugin: 'scala'
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: "com.github.johnrengelman.shadow"

group = 'com.sample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile('org.springframework.boot:spring-boot-starter')
compile ('org.apache.spark:spark-core_2.11:2.3.1')
compile ('org.apache.spark:spark-sql_2.11:2.3.1')
compile ('org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.1')
compile ('org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1')
compile 'org.scala-lang:scala-library:2.11.12'
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
//compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.0'

testCompile('org.springframework.boot:spring-boot-starter-test')
}
shadowJar{
baseName = "spark_kafka_integration"
zip64 true
classifier = null
version = null
}

因此,要创建您的jar,命令应该是:gradle中的:shadowJar。

关于scala - 无法将Spark数据帧发送到Kafka(java.lang.ClassNotFoundException:无法找到数据源:kafka。),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52202990/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com