gpt4 book ai didi

apache-spark - Spark 2.4.0 functions.udf 不适用于集合

转载 作者:行者123 更新时间:2023-12-03 21:17:10 24 4
gpt4 key购买 nike

以下代码用于在 2.4.0 之前的 Spark 版本 (2.*) 中正常工作

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object MyApp extends App {

val spark = SparkSession.builder
.appName("udf check").master("local[*]").getOrCreate
import spark.implicits._

val initDf = spark.read
.option("delimiter", "|")
.csv("input.txt")
.select($"_c0".alias("person"), split($"_c1", ",").alias("friends"))
//udf's
val reverse_friends_name = udf((friends: Seq[String]) => friends.map(_.reverse))
val flatten = udf((listOfFriends: Seq[Seq[String]]) => listOfFriends.flatten.toList)

initDf.groupBy("person").agg(reverse_friends_name(flatten(collect_set("friends")))).show

}

下面是输入
sam|jenny,miller
miller|joe
sam|carl
joe|frank

生成的输出:
+------+------------------------------------+
|person|UDF(UDF(collect_set(friends, 0, 0)))|
+------+------------------------------------+
|miller| [eoj]|
| joe| [knarf]|
| sam| [ynnej, rellim, l...|
+------+------------------------------------+

但是对于 Spark 2.4.0,下面的代码会中断

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object MyApp extends App {

val spark = SparkSession.builder
.appName("udf check").master("local[*]").getOrCreate
import spark.implicits._

val initDf = spark.read
.option("delimiter", "|")
.csv("input.txt")
.select($"_c0".alias("person"), split($"_c1", ",").alias("friends"))
//udf
val reverse_friends_name = udf((friends: Seq[String]) => friends.map(_.reverse))

initDf.groupBy("person").agg(reverse_friends_name(flatten(collect_set("friends")))).show

}

产生以下错误
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
- object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$1841/822958001, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$1841/822958001@e097c13)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(flatten(collect_set(friends#15, 0, 0)#20)))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(flatten(collect_set(friends#15, 0, 0)#20)) AS UDF(flatten(collect_set(friends, 0, 0)))#21)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(person#14, UDF(flatten(collect_set(friends#15, 0, 0)#20)) AS UDF(flatten(collect_set(friends, 0, 0)))#21))
- field (class: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: resultExpressions, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, ObjectHashAggregate(keys=[person#14], functions=[collect_set(friends#15, 0, 0)], output=[person#14, UDF(flatten(collect_set(friends, 0, 0)))#21])

我找不到很多指向此功能的文档。它是否被删除以支持集合的添加功能?

最佳答案

如果您的代码抛出上述错误,请将您的 Spark 库版本从 2.12 更改为 2.11..
一切都会运行。
在我的情况下,我使用 3.x 版本的 spark,其中 2.12 是默认的,我用 2.11 切换到 2.4 并且一切正常....

关于apache-spark - Spark 2.4.0 functions.udf 不适用于集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53494559/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com