gpt4 book ai didi

apache-spark - 在 Spark UDF 中操作数据框

转载 作者:行者123 更新时间:2023-12-04 04:54:18 24 4
gpt4 key购买 nike

我有一个 UDF 可以从数据框中过滤和选择值,但它遇到了“对象不可序列化”错误。详情如下。

假设我有一个数据框 df1,其中的列名称为 ("ID"、"Y1"、"Y2"、"Y3"、"Y4"、"Y5"、"Y6"、"Y7"、"Y8", "Y9", "Y10").我想根据另一个数据框 df2 中匹配的“ID”和“值”对“Y”列的一个子集求和。我尝试了以下方法:

val y_list = ("Y1", "Y2", "Y3", "Y4", "Y5", "Y6", "Y7", "Y8", "Y9", "Y10").map(c => col(c))

def udf_test(ID: String, value: Int): Double = {
df1.filter($"ID" === ID).select(y_list:_*).first.toSeq.toList.take(value).foldLeft(0.0)(_+_)
}
sqlContext.udf.register("udf_test", udf_test _)

val df_result = df2.withColumn("Result", callUDF("udf_test", $"ID", $"Value"))

这给了我以下形式的错误:

java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: Y1)

我查了一下,发现 Spark Column 不可序列化。我想知道:

1) 有什么方法可以在 UDF 中操作数据帧?

2) 如果不是,实现上述操作类型的最佳方法是什么?我的真实情况比这更复杂。它要求我根据大数据框中的某些列从多个小数据框中选择值,并将值计算回大数据框中。

我正在使用 Spark 1.6.3。谢谢!

最佳答案

您不能在 UDF 中使用数据集操作。 UDF 只能对现有列进行操作并生成一个结果列。它不能过滤数据集或进行聚合,但可以在过滤器内部使用。 UDAF 还可以聚合值。

相反,您可以使用 .as[SomeCaseClass] 从 DataFrame 生成 Dataset,并在 filter、map、reduce 中使用普通的强类型函数。

编辑:如果你想加入你的 bigDF 和 smallDFs 列表中的每个小 DF,你可以这样做:

import org.apache.spark.sql.functions._
val bigDF = // some processing
val smallDFs = Seq(someSmallDF1, someSmallDF2)
val joined = smallDFs.foldLeft(bigDF)((acc, df) => acc.join(broadcast(df), "join_column"))

broadcast 是给small DF添加Broadcast Hint的函数,让small DF使用更高效的Broadcast Join而不是Sort Merge Join

关于apache-spark - 在 Spark UDF 中操作数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48893002/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com