gpt4 book ai didi

scala - Spark 上的序列化异常

转载 作者:行者123 更新时间:2023-12-01 18:10:03 24 4
gpt4 key购买 nike

我在 Spark 上遇到了一个关于序列化的非常奇怪的问题。代码如下:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
def infer(document: RDD[Document]): RDD[DocumentParameter] = {
val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
docs
}
}

其中文档定义为:

class Document(val tokens: SparseVector[Int]) extends Serializable

文档参数是:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable

object DocumentParameter extends Serializable
{
def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document,
Array.ofDim[Float](numOfTopics))
}

SparseVector是breeze.linalg.SparseVector中的一个可序列化类。

这是一个简单的映射过程,所有类都是可序列化的,但我得到了这个异常:

org.apache.spark.SparkException: Task not serializable

但是当我删除 numOfTopics 参数时,即:

object DocumentParameter extends Serializable
{
def apply(document: Document) = new DocumentParameter(document,
Array.ofDim[Float](10))
}

并这样调用它:

val docs = documents.map(DocumentParameter.apply)

看起来还不错。

Int 类型不可序列化吗?但我确实看到有些代码是这样写的。

我不知道如何修复这个错误。

#更新#:

谢谢@samthebest。我将添加更多有关它的详细信息。

stack trace:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.topicmodel.PLSA.infer(PLSA.scala:13)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC.<init>(<console>:41)
at $iwC.<init>(<console>:43)
at <init>(<console>:45)
at .<init>(<console>:49)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 46 more

由于堆栈跟踪提供了异常的一般信息,因此我将其删除。

我在 Spark-Shell 中运行代码。

// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)

您能给我一些有关序列化的教程或技巧吗?

最佳答案

匿名函数序列化其包含的类。当你 map {doc => DocumentParameter(doc, numOfTopics)} ,它可以让该函数访问 numOfTopics 的唯一方法就是序列化PLSA类(class)。该类实际上无法序列化,因为(正如您从堆栈跟踪中看到的)它包含 SparkContext这是不可序列化的(如果单个集群节点可以访问上下文并且可以例如从映射器内创建新作业,则会发生不好的事情)。

一般来说,尽量避免存储 SparkContext在你的类中(编辑:或者至少,确保非常清楚哪种类包含SparkContext,哪种类不包含);最好将其作为(可能是 implicit )参数传递给需要它的各个方法。或者,移动函数 {doc => DocumentParameter(doc, numOfTopics)}进入与 PLSA 不同的类(class),一个真正可以序列化的。

(正如很多人所建议的,可以将SparkContext保留在类中,但标记为@transient,这样它就不会被序列化。我不推荐这种方法;这意味着该类将““神奇地”在序列化时改变状态(丢失 SparkContext ),因此当您尝试从序列化作业内部访问 SparkContext 时,您可能最终会遇到 NPE。最好在仅使用的类之间保持明确的区别在“控制”代码中(并且可能使用 SparkContext )和序列化为在集群上运行的类(不得具有 SparkContext ))。

关于scala - Spark 上的序列化异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27579474/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com