- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如何配置 Spark 以将 KryoSerializer 用于 lambda?还是我在 Spark 中发现了错误?我们对其他地方的数据序列化没有问题,只是这些 lambda 使用默认值而不是 Kryo。
代码如下:
JavaPairRDD<String, IonValue> rdd; // provided
IonSexp filterExpression; // provided
Function<Tuple2<String, IonValue>, Boolean> filterFunc = record -> myCustomFilter(filterExpression, record);
rdd = rdd.filter(filterFunc);
异常抛出:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:388)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.filter(RDD.scala:387)
at org.apache.spark.api.java.JavaPairRDD.filter(JavaPairRDD.scala:99)
at com.example.SomeClass.process(SomeClass.java:ABC)
{more stuff}
Caused by: java.io.NotSerializableException: com.amazon.ion.impl.lite.IonSexpLite
Serialization stack:
- object not serializable (class: com.amazon.ion.impl.lite.IonSexpLite, value: (and (equals (literal 1) (path marketplace_id)) (equals (literal 351) (path product gl_product_group))))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.example.SomeClass, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial com/example/SomeClass.lambda$process$8f20a2d2$1:(Lcom/amazon/ion/IonSexp;Lscala/Tuple2;)Ljava/lang/Boolean;, instantiatedMethodType=(Lscala/Tuple2;)Ljava/lang/Boolean;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.example.SomeClass$$Lambda$36/263969036, com.example.SomeClass$$Lambda$36/263969036@31880efa)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$filter$1, name: f$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$filter$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 18 more
在这种情况下,有问题的 filterExpression
是一个 Ion S-Expression对象,它没有实现 java.io.Serializable
。我们正在使用 Kryo 序列化程序并已注册和配置它,以便它可以很好地序列化它。
初始化spark配置时的代码:
sparkConf = new SparkConf().setAppName("SomeAppName").setMaster("MasterLivesHere")
.set("spark.serializer", KryoSerializer.class.getCanonicalName())
.set("spark.kryo.registrator", KryoRegistrator.class.getCanonicalName())
.set("spark.kryo.registrationRequired", "false");
注册器中的代码:
kryo.register(com.amazon.ion.IonSexp.class);
kryo.register(Class.forName("com.amazon.ion.impl.lite.IonSexpLite"));
如果我尝试使用下面的代码手动序列化该 lambda
SerializationUtils.serialize(filterFunc);
正如预期的那样,它失败并出现相同的错误,因为 filterExpression
不可序列化。但是,下面的代码有效:
sparkContext.env().serializer().newInstance().serialize(filterFunc, ClassTag$.MODULE$.apply(filterFunc.getClass()));
这再次符合预期,因为我们的 Kryo 设置能够处理这些对象。
所以我的问题/困惑是,为什么 Spark 尝试使用 org.apache.spark.serializer.JavaSerializer
序列化该 lambda,而我们已经明确地将其配置为使用 Kryo?
最佳答案
经过更多的挖掘后发现确实有一个不同的序列化器被用于闭包。由于 Kryo 的错误,闭包序列化程序被硬编码为默认序列化程序。
这个答案很好地解释了它:https://stackoverflow.com/a/40261550/2158122
不过,我能够使用广播解决我的特殊问题。
这是我的代码现在的样子:
JavaSparkContext sparkContext; // provided
JavaPairRDD<String, IonValue> rdd; // provided
IonSexp filterExpression; // provided
Broadcast<IonSexp> filterExprBroadcast = sparkContext.broadcast(filterExpression);
rdd = rdd.filter(record -> myCustomFilter(filterExprBroadcast.value(), record));
filterExprBroadcast.destroy(false); // Only do this after an action is executed
广播的处理方式与 RDD 类似,因此它确实使用了已配置的 Kryo 序列化器。
关于java - 在 Spark 中配置函数/lambda 序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58887548/
可以使用 lambda 和函数创建有序对(Lisp 中的缺点),如 Use of lambda for cons/car/cdr definition in SICP 所示。 它也适用于 Python
我正在尝试从另一个调用一个 AWS lambda 并执行 lambda 链接。这样做的理由是 AWS 不提供来自同一个 S3 存储桶的多个触发器。 我创建了一个带有 s3 触发器的 lambda。第一
根据以下源代码,常规 lambda 似乎可以与扩展 lambda 互换。 fun main(args: Array) { val numbers = listOf(1, 2, 3) f
A Tutorial Introduction to the Lambda Calculus 本文介绍乘法函数 The multiplication of two numbers x and y ca
我想弄清楚如何为下面的表达式绘制语法树。首先,这究竟是如何表现的?看样子是以1和2为参数,如果n是 0,它只会返回 m . 另外,有人可以指出解析树的开始,还是一个例子?我一直找不到一个。 最佳答案
在 C++0x 中,我想知道 lambda 函数的类型是什么。具体来说: #include type1 foo(int x){ return [x](int y)->int{return x * y
我在其中一个职位发布中看到了这个问题,它询问什么是 lambda 函数以及它与高阶函数的关系。我已经知道如何使用 lambda 函数,但不太自信地解释它,所以我做了一点谷歌搜索,发现了这个:What
很难说出这里问的是什么。这个问题是含糊的、模糊的、不完整的、过于宽泛的或修辞性的,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开它,visit the help center 。 已关
Evaluate (((lambda(x y) (lambda (x) (* x y))) 5 6) 10) in Scheme. 我不知道实际上该怎么做! ((lambda (x y) (+ x x
我正在处理 MyCustomType 的实例集合如下: fun runAll(vararg commands: MyCustomType){ commands.forEach { it.myM
Brian 在他对问题 "Are side effects a good thing?" 的论证中的前提很有趣: computers are von-Neumann machines that are
在 Common Lisp 中,如果我希望两个函数共享状态,我将按如下方式执行 let over lambda: (let ((state 1)) (defun inc-state () (in
Evaluate (((lambda(x y) (lambda (x) (* x y))) 5 6) 10) in Scheme. 我不知道实际上该怎么做! ((lambda (x y) (+ x x
作为lambda calculus wiki说: There are several possible ways to define the natural numbers in lambda cal
我有一个数据类,我需要初始化一些 List .我需要获取 JsonArray 的值(我使用的是 Gson)。 我做了这个函数: private fun arrayToList(data: JsonAr
((lambda () )) 的方案中是否有简写 例如,代替 ((lambda () (define x 1) (display x))) 我希望能够做类似的事情 (empty-lam
我在 Java library 中有以下方法: public void setColumnComparator(final int columnIndex, final Comparator colu
我正在研究一个函数来计算国际象棋游戏中棋子的有效移动。 white-pawn-move 函数有效。当我试图将其概括为任一玩家的棋子 (pawn-move) 时,我遇到了非法函数调用。我已经在 repl
考虑这段代码(在 GCC 和 MSVC 上编译): int main() { auto foo = [](auto p){ typedef decltype(p) p_t;
我正在阅读一个在 lambda 内部使用 lambda 的片段,然后我想通过创建一个虚拟函数来测试它,该函数从文件中读取然后返回最大和最小数字。 这是我想出来的 dummy = lambda path
我是一名优秀的程序员,十分优秀!