gpt4 book ai didi

scala - 仅在类而非对象上调用闭包外部的函数时,任务不可序列化 : java. io.NotSerializedException

转载 作者:行者123 更新时间:2023-12-03 04:09:27 27 4
gpt4 key购买 nike

在闭包之外调用函数时出现奇怪的行为:

  • 当函数位于对象中时,一切正常
  • 当函数在类中时获取:

Task not serializable: java.io.NotSerializableException: testing

问题是我需要类中的代码而不是对象中的代码。知道为什么会发生这种情况吗? Scala 对象是否已序列化(默认?)?

这是一个有效的代码示例:

object working extends App {
val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))

def someFunc(a:Int) = a+1

after.collect().map(println(_))
}

这是非工作示例:

object NOTworking extends App {
new testing().doIT
}

//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)

def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}

def someFunc(a:Int) = a+1
}

最佳答案

RDDs extend the Serialisable interface ,所以这不是导致您的任务失败的原因。现在这并不意味着您可以序列化 RDD使用 Spark 并避免 NotSerializableException

Spark是一个分布式计算引擎,其主要抽象是弹性分布式数据集(RDD),可以将其视为分布式集合。基本上,RDD 的元素跨集群的节点进行分区,但 Spark 将其从用户中抽象出来,让用户与 RDD(集合)进行交互,就好像它是本地 RDD 一样。

不要涉及太多细节,但是当您在 RDD 上运行不同的转换( mapflatMapfilter 等)时,您的转换代码(闭包)是:

  1. 在驱动程序节点上序列化,
  2. 发送到集群中的适当节点,
  3. 反序列化,
  4. 最终在节点上执行

您当然可以在本地运行它(如您的示例中所示),但所有这些阶段(除了通过网络运输之外)仍然会发生。 [这可以让您在部署到生产之前捕获任何错误]

在第二种情况下,您正在调用类 testing 中定义的方法。从 map 函数内部。 Spark 发现了这一点,并且由于方法无法自行序列化,Spark 尝试序列化整个 testing类,以便代码在另一个 JVM 中执行时仍然可以工作。您有两种可能性:

要么使类测试可序列化,以便 Spark 可以序列化整个类:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
new Test().doIT
}

class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))

def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}

def someFunc(a: Int) = a + 1
}

或者你做someFunc函数而不是方法(函数是 Scala 中的对象),以便 Spark 能够序列化它:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
new Test().doIT
}

class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))

def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}

val someFunc = (a: Int) => a + 1
}

您可能会对类序列化的类似但不相同的问题感兴趣,您可以阅读它 in this Spark Summit 2013 presentation .

作为旁注,您可以重写 rddList.map(someFunc(_))rddList.map(someFunc) ,它们是完全相同的。通常,第二个是首选,因为它更简洁,读起来更清晰。

编辑(2015-03-15):SPARK-5307引入了SerializationDebugger,Spark 1.3.0是第一个使用它的版本。它将序列化路径添加到NotSerializedException。当遇到NotSerializedException时,调试器会访问对象图来查找无法序列化的对象的路径,并构造信息来帮助用户找到该对象。

在OP的例子中,这是打印到标准输出的内容:

Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)

关于scala - 仅在类而非对象上调用闭包外部的函数时,任务不可序列化 : java. io.NotSerializedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22592811/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com