gpt4 book ai didi

apache-spark - 如何在 Spark SQL 中限制 functions.collect_set?

转载 作者:行者123 更新时间:2023-12-03 22:29:42 25 4
gpt4 key购买 nike

我正在处理一个大型 spark DataFrame 中的一列数字,我想创建一个新列来存储出现在该列中的唯一数字的聚合列表。

基本上正是 functions.collect_set 所做的。但是,我只需要聚合列表中最多 1000 个元素。有没有办法将该参数以某种方式传递给functions.collect_set(),或以任何其他方式在聚合列表中最多只获取1000个元素,而不使用UDAF?

由于该列太大,我想避免收集所有元素并在之后修剪列表。

谢谢!

最佳答案

Spark 2.4
正如评论中指出的那样,Spark 2.4.0带有切片标准功能,可以做这种事情。

val usage = sql("describe function slice").as[String].collect()(2)
scala> println(usage)
Usage: slice(x, start, length) - Subsets array x starting from index start (array indices start at 1, or starting from the end if start is negative) with the specified length.
这给出了以下查询:
val q = input
.groupBy('key)
.agg(collect_set('id) as "collect")
.withColumn("three_only", slice('collect, 1, 3))
scala> q.show(truncate = false)
+---+--------------------------------------+------------+
|key|collect |three_only |
+---+--------------------------------------+------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+
在 Spark 2.4 之前
我会使用 UDF 来做你想做的事 collect_set (或 collect_list)或更难的 UDAF。
鉴于对 UDF 的更多经验,我会首先使用它。即使 UDF 没有优化,对于这个用例也很好。
val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) }
val sample = spark.range(50).withColumn("key", $"id" % 5)

scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false)
+---+--------------------------------------+
|key|all |
+---+--------------------------------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|
+---+--------------------------------------+

scala> sample.
groupBy("key").
agg(collect_set("id") as "all").
withColumn("limit(3)", limitUDF($"all", lit(3))).
show(false)
+---+--------------------------------------+------------+
|key|all |limit(3) |
+---+--------------------------------------+------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+
functions对象(用于 udf 函数的文档)。

关于apache-spark - 如何在 Spark SQL 中限制 functions.collect_set?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38730912/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com