gpt4 book ai didi

scala - 将元组列表作为参数传递给scala中的spark udf

转载 作者:行者123 更新时间:2023-12-05 00:53:13 25 4
gpt4 key购买 nike

我正在尝试将元组列表传递给 scala 中的 udf。我不确定如何为此准确定义数据类型。我试图将它作为一整行传递,但它无法真正解决它。我需要根据元组的第一个元素对列表进行排序,然后返回 n 个元素。我已经为 udf 尝试了以下定义

def udfFilterPath = udf((id: Long, idList: Array[structType[Long, String]] )

def udfFilterPath = udf((id: Long, idList: Array[Tuple2[Long, String]] )

def udfFilterPath = udf((id: Long, idList: Row)

这是 idList 的样子:
[[1234,"Tony"], [2345, "Angela"]]
[[1234,"Tony"], [234545, "Ruby"], [353445, "Ria"]]

这是一个像上面一样有 100 行的数据框。我将 udf 称为如下:
testSet.select("id", "idList").withColumn("result", udfFilterPath($"id", $"idList")).show

当我打印数据帧的模式时,它会将其作为结构数组读取。 idList 本身是通过对按键分组并存储在数据帧中的元组列执行收集列表生成的。关于我做错了什么的任何想法?谢谢!

最佳答案

定义 UDF 时,您应该使用普通 Scala 类型(例如元组、基元...)而不是 Spark SQL 类型(例如 StructType )作为输出类型。

至于输入类型——这是它变得棘手的地方(并且没有很好的记录)——元组数组实际上是一个 mutable.WrappedArray[Row] .所以 - 你必须 “转换”首先将每一行变成一个元组,然后您可以进行排序并返回结果。

最后,根据你的描述,id column 根本没有使用,所以我从 UDF 定义中删除了它,但它可以很容易地添加回来。

val udfFilterPath = udf { idList: mutable.WrappedArray[Row] =>
// converts the array items into tuples, sorts by first item and returns first two tuples:
idList.map(r => (r.getAs[Long](0), r.getAs[String](1))).sortBy(_._1).take(2)
}

df.withColumn("result", udfFilterPath($"idList")).show(false)

+------+-------------------------------------------+----------------------------+
|id |idList |result |
+------+-------------------------------------------+----------------------------+
|1234 |[[1234,Tony], [2345,Angela]] |[[1234,Tony], [2345,Angela]]|
|234545|[[1234,Tony], [2345454,Ruby], [353445,Ria]]|[[1234,Tony], [353445,Ria]] |
+------+-------------------------------------------+----------------------------+

关于scala - 将元组列表作为参数传递给scala中的spark udf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41551410/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com