gpt4 book ai didi

scala - 通过字符串反射定义spark udf

转载 作者:行者123 更新时间:2023-12-02 18:50:02 25 4
gpt4 key购买 nike

我正在尝试从包含 scala 函数定义的字符串在 Spark(2.0) 中定义 udf。以下是代码片段:

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show

这给了我一个错误:

  Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

但是当我将 udf 定义为:

val f = udf((s:String) => 5)

它工作得很好。这里有什么问题?最终目标是获取一个具有 scala 函数的 defn 的字符串并将其用作 udf。

最佳答案

正如 Giovanny 观察到的,问题在于类加载器不同(您可以通过在任何对象上调用 .getClass.getClassLoader 来进一步研究这一点)。然后,当工作人员尝试反序列化您的反射函数时,一切都会崩溃。

这是一个不涉及任何类加载器黑客行为的解决方案。这个想法是将反射(reflection)步骤转移给 worker 。我们最终将不得不重做反射步骤,但每个工作人员仅一次。我认为这是非常理想的 - 即使您只在主节点上进行一次反射,您也必须为每个工作人员做相当多的工作才能让他们识别该功能。

val f = udf {
new Function1[String,Int] with Serializable {
import scala.reflect.runtime.universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox

lazy val toolbox = currentMirror.mkToolBox()
lazy val func = {
println("reflected function") // triggered at every worker
toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
}

def apply(s: String): Int = func(s)
}
}

然后,调用 sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show 就可以了。

随意注释掉println - 它只是计算反射发生次数的简单方法。在 spark-shell --master 'local' 中只有一次,但在 spark-shell --master 'local[2]' 中是两次。

它是如何工作的

UDF 会立即求值,但在到达工作节点之前不会被使用,因此惰性值 toolboxfunc 仅在工作节点上求值。此外,由于它们很懒,每个工作人员只对它们进行一次评估。

关于scala - 通过字符串反射定义spark udf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38848847/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com