- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
免责声明:刚开始玩 Spark。
我无法理解著名的“任务不可序列化”异常,但我的问题与我在 SO 上看到的问题有点不同(或者我认为如此)。
我有一个很小的自定义 RDD (TestRDD
)。它有一个字段,用于存储其类未实现可序列化 (NonSerializable
) 的对象。我已将“spark.serializer”配置选项设置为使用 Kryo。但是,当我在我的 RDD 上尝试 count()
时,我得到以下信息:
Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
当我查看 DAGScheduler.submitMissingTasks
时,我看到它在我的 RDD 上使用了它的 closure 序列化器,它是 Java 序列化器,而不是我想要的 Kryo 序列化器预计。我读过 Kryo 在序列化闭包方面存在问题,而 Spark 始终使用 Java 序列化程序来进行闭包,但我完全不明白闭包是如何在这里发挥作用的。我在这里所做的就是:
SparkConf conf = new SparkConf()
.setAppName("ScanTest")
.setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());
也就是说,没有映射器或任何需要序列化闭包的东西。 OTOH 这行得通:
sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()
Kryo 序列化器按预期使用,不涉及闭包序列化器。如果我没有将序列化程序属性设置为 Kryo,我也会在此处遇到异常。
我感谢任何解释闭包来源以及如何确保我可以使用 Kryo 序列化自定义 RDD 的指示。
更新:这是带有不可序列化字段mNS
的TestRDD
:
class TestRDD extends RDD<String> {
private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);
NonSerializable mNS = new NonSerializable();
public TestRDD(final SparkContext _sc) {
super(_sc,
JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
STRING_TAG);
}
@Override
public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
"test_" + thePartition.index(),
"test_" + thePartition.index()).iterator()).asScala();
}
@Override
public Partition[] getPartitions() {
return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
}
static class TestPartition implements Partition {
final int mIndex;
public TestPartition(final int theIndex) {
mIndex = theIndex;
}
public int index() {
return mIndex;
}
}
}
最佳答案
When I look inside
DAGScheduler.submitMissingTasks
I see that it uses its closure serializer on my RDD, which is the Java serializer, not the Kryo serializer which I'd expect.
SparkEnv
支持两种序列化器,一种名为serializer
,用于数据序列化、检查点、工作人员之间的消息传递等,可在 spark 下使用。 serializer
配置标志。另一个称为 spark.closure.serializer
下的 closureSerializer
,用于检查您的对象实际上是可序列化的并且可配置为 Spark <= 1.6.2(但没有除了 JavaSerializer
实际工作之外)并从 2.0.0 及更高版本硬编码到 JavaSerializer
。
Kryo 闭包序列化程序有一个错误导致它无法使用,您可以在 SPARK-7708 下查看该错误(这可能已通过 Kryo 3.0.0 修复,但 Spark 目前已通过特定版本的 Chill 修复,该版本已在 Kryo 2.2.1 上修复)。此外,对于 Spark 2.0.x,JavaSerializer 现在是固定的而不是可配置的(您可以看到它 in this pull request )。这意味着实际上我们只能使用 JavaSerializer
来进行闭包序列化。
我们使用一个序列化器来提交任务,而另一个序列化器在工作人员之间序列化数据,这很奇怪吗?当然可以,但这就是我们所拥有的。
总而言之,如果您正在设置 spark.serializer
配置,或使用 SparkContext.registerKryoClasses
,您将在 Spark 中使用 Kryo 进行大部分序列化.话虽如此,为了检查给定类是否可序列化并将任务序列化给工作人员,Spark 将使用 JavaSerializer
。
关于java - 了解 Spark 的闭包及其序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40259196/
目前正在学习 Spark 的类(class)并了解到执行者的定义: Each executor will hold a chunk of the data to be processed. Thisc
阅读了有关 http://spark.apache.org/docs/0.8.0/cluster-overview.html 的一些文档后,我有一些问题想要澄清。 以 Spark 为例: JavaSp
Spark核心中的调度器与以下Spark Stack(来自Learning Spark:Lightning-Fast Big Data Analysis一书)中的Standalone Schedule
我想在 spark-submit 或 start 处设置 spark.eventLog.enabled 和 spark.eventLog.dir -all level -- 不要求在 scala/ja
我有来自 SQL Server 的数据,需要在 Apache Spark (Databricks) 中进行操作。 在 SQL Server 中,此表的三个键列使用区分大小写的 COLLATION 选项
所有这些有什么区别和用途? spark.local.ip spark.driver.host spark.driver.bind地址 spark.driver.hostname 如何将机器修复为 Sp
我有大约 10 个 Spark 作业,每个作业都会进行一些转换并将数据加载到数据库中。必须为每个作业单独打开和关闭 Spark session ,每次初始化都会耗费时间。 是否可以只创建一次 Spar
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell 20/09/23 10:58:45 WARN Utils: Your hostname,
我是 Spark 的完全新手,并且刚刚开始对此进行更多探索。我选择了更长的路径,不使用任何 CDH 发行版安装 hadoop,并且我从 Apache 网站安装了 Hadoop 并自己设置配置文件以了解
TL; 博士 Spark UI 显示的内核和内存数量与我在使用 spark-submit 时要求的数量不同 更多细节: 我在独立模式下运行 Spark 1.6。 当我运行 spark-submit 时
spark-submit 上的文档说明如下: The spark-submit script in Spark’s bin directory is used to launch applicatio
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 6 个月前关闭。 Improve
我想了解接收器如何在 Spark Streaming 中工作。根据我的理解,将有一个接收器任务在执行器中运行,用于收集数据并保存为 RDD。当调用 start() 时,接收器开始读取。需要澄清以下内容
有没有办法在不同线程中使用相同的 spark 上下文并行运行多个 spark 作业? 我尝试使用 Vertx 3,但看起来每个作业都在排队并按顺序启动。 如何让它在相同的 spark 上下文中同时运行
我们有一个 Spark 流应用程序,这是一项长期运行的任务。事件日志指向 hdfs 位置 hdfs://spark-history,当我们开始流式传输应用程序时正在其中创建 application_X
我们正在尝试找到一种加载 Spark (2.x) ML 训练模型的方法,以便根据请求(通过 REST 接口(interface))我们可以查询它并获得预测,例如http://predictor.com
Spark newb 问题:我在 spark-sql 中进行完全相同的 Spark SQL 查询并在 spark-shell . spark-shell版本大约需要 10 秒,而 spark-sql版
我正在使用 Spark 流。根据 Spark 编程指南(参见 http://spark.apache.org/docs/latest/programming-guide.html#accumulato
我正在使用 CDH 5.2。我可以使用 spark-shell 运行命令。 如何运行包含spark命令的文件(file.spark)。 有没有办法在不使用 sbt 的情况下在 CDH 5.2 中运行/
我使用 Elasticsearch 已经有一段时间了,但使用 Cassandra 的经验很少。 现在,我有一个项目想要使用 Spark 来处理数据,但我需要决定是否应该使用 Cassandra 还是
我是一名优秀的程序员,十分优秀!