gpt4 book ai didi

java - 为什么这个 Spark 代码抛出 java.io.NotSerializableException

转载 作者:搜寻专家 更新时间:2023-11-01 02:01:57 25 4
gpt4 key购买 nike

我想在 RDD 的转换中访问伴生对象的方法。为什么以下不起作用:

import org.apache.spark.rdd.RDD
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}

class Abc {
def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

object Abc {
def fn(x: Int): Double = { x.toDouble }
}

implicit def abcEncoder: Encoder[Abc] = Encoders.kryo[Abc]

new Abc().transform(sc.parallelize(1 to 10)).collect

上面的代码抛出一个java.io.NotSerializableException:

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at Abc.transform(<console>:19)
... 47 elided
Caused by: java.io.NotSerializableException: Abc
Serialization stack:
- object not serializable (class: Abc, value: Abc@4f598dfb)
- field (class: Abc$$anonfun$transform$1, name: $outer, type: class Abc)
- object (class Abc$$anonfun$transform$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 57 more

即使为类 Abc 定义一个 Encoder 在这里也无济于事。但更重要的问题是,为什么要尝试对 Abc 类的对象进行序列化?我的第一个想法是伴随对象是该类的单例对象,因此可能会尝试对其进行序列化。但似乎并非如此,因为当我从另一个类(class)调用 Abc.fn 时:

class Xyz {
def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}

implicit def xyzEncoder: Encoder[Xyz] = Encoders.kryo[Xyz]

new Xyz().transform(sc.parallelize(1 to 10)).collect

我得到一个 java.io.NotSerializableException: Xyz

最佳答案

这是一篇讨论 Apache Spark 中“可序列化”对象与“不可序列化对象”的精彩文章:

Using Non-Serializable Objects in Apache Spark, Nicola Ferraro

这篇文章提供了几个建议:

  • 在您的具体案例中发生了什么

  • 一些替代方案,因此您的对象不需要“可序列化”

关于java - 为什么这个 Spark 代码抛出 java.io.NotSerializableException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43827007/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com