gpt4 book ai didi

java - Spark - 使用不可序列化的成员序列化对象

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:54:11 30 4
gpt4 key购买 nike

我将在 Spark 的上下文中提出这个问题,因为这就是我面临的问题,但这可能是一个普通的 Java 问题。

在我们的 spark 作业中,我们有一个 Resolver 需要在我们所有的 worker 中使用(它在 udf 中使用)。问题是它不可序列化,我们无法将其更改为可序列化。解决方案是将其作为另一个可序列化的类的成员。

所以我们最终得到:

public class Analyzer implements Serializable {
transient Resolver resolver;

public Analyzer() {
System.out.println("Initializing a Resolver...");
resolver = new Resolver();
}

public int resolve(String key) {
return resolver.find(key);
}
}

然后我们使用 Spark API 广播这个类:

 val analyzer = sparkContext.broadcast(new Analyzer())

(更多关于Spark广播的信息可以查看here)

然后我们继续在 UDF 中使用 analyzer,作为我们的 spark 代码的一部分,例如:

val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()

这一切都按预期工作,但让我们感到疑惑。

Resolver 没有实现 Serializable,因此被标记为 transient - 这意味着它不会与其所有者对象一起被序列化 分析器

但是从上面的代码可以清楚的看到,resolve()方法使用了resolver,所以一定不能为null。事实上它不是。代码有效。

那么如果字段不通过序列化传递,resolver成员是如何实例化的呢?

我最初的想法是,也许 Analyzer 构造函数在接收方(即 spark worker)被调用,但后来我希望看到行 "Initializing a Resolver.. ” 打印了几次。但它只打印一次,这可能表明它只被调用一次,就在它传递给广播 API 之前。那么为什么 resolver 不为空?

我是否遗漏了有关 JVM 序列化或 Spark 序列化的内容?

这段代码是如何工作的?

Spark 在 YARN 上以 cluster 模式运行。spark.serializer 设置为 org.apache.spark.serializer.KryoSerializer

最佳答案

So if the field is not passed through serialization, how is the resolver member instantiated?

它是通过构造函数调用(new Resolver)实例化的,当调用kryo.readObject时:

kryo.readClassAndObject(input).asInstanceOf[T]

My initial thought was that maybe the Analyzer constructor is called on the receiving side (i.e. the spark worker), but then I would expect to see the line "Initializing a Resolver..." printed several times. But it's only printed once, which is probably an indication to the fact that it's only called once

这不是广播变量的工作方式。发生的情况是,当每个 Executor 需要范围内的广播变量时​​,它首先检查它是否在其 BlockManager 内存中有该对象,如果没有,它会询问驱动程序或邻居执行程序(如果同一个 Worker 节点上有多个执行器)为他们缓存的实例,他们将它序列化并发送给他,然后他接收实例并将其缓存在他自己的 BlockManager 中。

这记录在 TorrentBroadcast 的行为中(这是默认的广播实现):

* The driver divides the serialized object into small chunks and
* stores those chunks in the BlockManager of the driver.
*
* On each executor, the executor first attempts to fetch the object from its BlockManager. If
* it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
* other executors if available. Once it gets the chunks, it puts the chunks in its own
* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor).

if we remove the transient it fails, and the stack-trace leads to Kryo

那是因为您的 Resolver 类中可能有一个字段,即使 Kryo 也无法序列化,无论 Serializable 属性如何。

关于java - Spark - 使用不可序列化的成员序列化对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48371045/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com