gpt4 book ai didi

apache-spark - 如何将整行传递给 UDF - Spark DataFrame 过滤器

转载 作者:行者123 更新时间:2023-12-03 11:55:26 26 4
gpt4 key购买 nike

我正在为具有很多内部结构的复杂 JSON 数据集编写过滤器函数。传递单个列太麻烦了。

所以我声明了以下UDF:

val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))

直觉上,我认为它会像这样工作:
records.filter("myFilter(*)=true")

实际的语法是什么?

最佳答案

您必须使用 struct()在调用函数时构造行的函数,请按照下列步骤操作。

导入行,

import org.apache.spark.sql._

定义 UDF
def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} 

注册 UDF
sqlContext.udf.register("myFilterFunction", myFilterFunction _)

创建数据框
val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")

使用 UDF
records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show

当您希望将所有列传递给 UDF 时。
records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show 

结果:
+------+------+
| text| text2|
+------+------+
|sachin|sachin|
+------+------+

关于apache-spark - 如何将整行传递给 UDF - Spark DataFrame 过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31816975/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com