gpt4 book ai didi

scala - Spark函数不可序列化

转载 作者:行者123 更新时间:2023-12-01 00:14:23 25 4
gpt4 key购买 nike

我有一个类:

class DataLoader {

def rdd2RddTransform(
ss: SparkSession,
inputRDD: RDD[GenericRecord]): RDD[GenericRecord] = {

inputRDD.asInstanceOf[RDD[TrainingData]]
.map(reformatTrainingData)
}

private def reformatTrainingData: TrainingData => ReFormatedData
= (trainingData: TrainingData) => {func implement}

}

它工作得很好,但抛出了一个异常: org.apache.spark.SparkException: Task not serializable在我对 RDD 的 map 做了一个小改动之后:
inputRDD.asInstanceOf[RDD[TrainingData]].map(reformatTrainingData(_))

我认为这两个功能应该相同,但似乎并非如此。为什么它们不同?

最佳答案

这是因为 Scala 中的方法和函数不能完全互换。

函数是独立的对象(即类的实例,例如 Function1Function2Function3 ...),但方法仍然与它们的封闭类相关联。如果封闭类不是 Serializable,这可能会在 Spark 中产生问题。 - 当 Spark 尝试序列化方法时,它无法序列化关联的类实例。

请注意,您的 reformatTrainingData是一个返回函数的方法

因此,当您调用以下内容时:

rdd.map(reformatTrainingData)

您实际上是在调用无参数 reformatTrainingData方法并返回一个独立的 Function1可以安全序列化的实例。你也可以这样写
private def reformatTrainingData(): TrainingData => ReFormatedData ...

rdd.map(reformatTrainingData())

强调正在发生方法调用。

当您更改为 reformatTrainingData(_)相反,您使用的是部分应用的方法;当 Spark 试图序列化它时,它需要拉入并序列化封闭的 DataLoader类,未标记为 Serializable .

如果 reformatTrainingData 也会出现同样的问题。是 TrainingData => ReFormatedData 类型的简单方法.

如果您标记 DataLoaderextends Serializable ,那么任何一个版本都应该可以工作。

也可以制作 reformatTrainingData变成 val , 因为 val 在序列化时不会拉入封闭类:
private val reformatTrainingData: TrainingData => ReFormatedData ...

rdd.map(reformatTrainingData)

关于scala - Spark函数不可序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54473877/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com