gpt4 book ai didi

scala - org.apache.spark.SparkException:任务无法序列化(由org.apache.hadoop.conf.Configuration引起)

转载 作者:行者123 更新时间:2023-12-02 21:18:30 25 4
gpt4 key购买 nike

我想将转换后的流写入Elasticsearch索引,如下所示:

transformed.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val messages = rdd.map(prepare)
messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
}
})
val messages = rdd.map(prepare)行引发错误(请参见下文)。我陷入了尝试以其他方式解决此问题的困境(例如在 @transient旁边添加 val conf),但是似乎没有任何效果。

6/06/28 19:23:00 ERROR JobScheduler: Error running job streaming job 1467134580000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.map(RDD.scala:323) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:77) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:75) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration Serialization stack: - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml) - field (class: de.kp.spark.elastic.stream.EsStream, name: de$kp$spark$elastic$stream$EsStream$$conf, type: class org.apache.hadoop.conf.Configuration) - object (class de.kp.spark.elastic.stream.EsStream, de.kp.spark.elastic.stream.EsStream@6b156e9a) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, name: $outer, type: class de.kp.spark.elastic.stream.EsStream) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, ) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, name: $outer, type: class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.map(RDD.scala:323) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:77) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:75) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration Serialization stack: - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml) - field (class: de.kp.spark.elastic.stream.EsStream, name: de$kp$spark$elastic$stream$EsStream$$conf, type: class org.apache.hadoop.conf.Configuration) - object (class de.kp.spark.elastic.stream.EsStream, de.kp.spark.elastic.stream.EsStream@6b156e9a) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, name: $outer, type: class de.kp.spark.elastic.stream.EsStream) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, ) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, name: $outer, type: class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more



它与Hadoop的配置有某种关系吗? (我引用此消息: class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml)

更新:
class EsStream(name:String,conf:HConf) extends SparkBase with Serializable {

/* Elasticsearch configuration */
val ec = getEsConf(conf)

/* Kafka configuration */
val (kc,topics) = getKafkaConf(conf)

def run() {

val ssc = createSSCLocal(name,conf)

/*
* The KafkaInputDStream returns a Tuple where only the second component
* holds the respective message; we therefore reduce to a DStream[String]
*/
val stream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kc,topics,StorageLevel.MEMORY_AND_DISK).map(_._2)
/*
* Inline transformation of the incoming stream by any function that maps
* a DStream[String] onto a DStream[String]
*/
val transformed = transform(stream)
/*
* Write transformed stream to Elasticsearch index
*/
transformed.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val messages = rdd.map(prepare)
messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
}
})

ssc.start()
ssc.awaitTermination()

}

def transform(stream:DStream[String]) = stream

private def getEsConf(config:HConf):HConf = {

val _conf = new HConf()

_conf.set("es.nodes", conf.get("es.nodes"))
_conf.set("es.port", conf.get("es.port"))

_conf.set("es.resource", conf.get("es.resource"))

_conf

}

private def getKafkaConf(config:HConf):(Map[String,String],Map[String,Int]) = {

val cfg = Map(
"group.id" -> conf.get("kafka.group"),

"zookeeper.connect" -> conf.get("kafka.zklist"),
"zookeeper.connection.timeout.ms" -> conf.get("kafka.timeout")

)

val topics = conf.get("kafka.topics").split(",").map((_,conf.get("kafka.threads").toInt)).toMap

(cfg,topics)

}

private def prepare(message:String):(Object,Object) = {

val m = JSON.parseFull(message) match {
case Some(map) => map.asInstanceOf[Map[String,String]]
case None => Map.empty[String,String]
}

val kw = NullWritable.get

val vw = new MapWritable
for ((k, v) <- m) vw.put(new Text(k), new Text(v))

(kw, vw)

}

}

最佳答案

conf:HConf的类构造函数中删除EsStream,并将其编写为class EsStream(name:String)

接下来创建一个带有签名的方法:public def init(conf:HConf):Map(String,String)
在这种方法中,您将阅读所需的配置并在其中更新ec(kc,topics)

之后,您应该调用run方法。

关于scala - org.apache.spark.SparkException:任务无法序列化(由org.apache.hadoop.conf.Configuration引起),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38082657/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com