gpt4 book ai didi

scala - Spark 数据帧 : does groupBy after orderBy maintain that order?

转载 作者:行者123 更新时间:2023-12-03 14:46:21 25 4
gpt4 key购买 nike

我有一个 Spark 2.0 数据框 example具有以下结构:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.

它包含每个 id 的 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序。

我创建了一个聚合器 groupConcat :
  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""

override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

override def merge(b1: String, b2: String) = b1 + b2

override def finish(b: String) = b.substring(1)

override def bufferEncoder: Encoder[String] = Encoders.STRING

override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn

它帮助我将列连接成字符串以获得最终的数据帧:
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.

我的问题是,如果我这样做 example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count") ,这是否保证每小时计数将在各自的存储桶中正确排序?

我读到 RDD 不一定是这种情况(请参阅 Spark sort by key and then group by to get ordered iterable? ),但也许 DataFrames 不同?

如果没有,我该如何解决?

最佳答案

正如其他人指出的那样, orderBy 之后的 groupBy 不维护订单。您想要做的是使用 Window 函数 - 对 id 进行分区并按小时排序。您可以在此之上 collect_list ,然后获取结果列表的最大值(最大),因为它们是累积的(即第一个小时将只在列表中,第二个小时将在列表中包含 2 个元素,依此类推)。

完整示例代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val data = Seq(( "id1", 0, 12),
("id1", 1, 55),
("id1", 23, 44),
("id2", 0, 12),
("id2", 1, 89),
("id2", 23, 34)).toDF("id", "hour", "count")

val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
data.withColumn("collected", collect_list($"count")
.over(Window.partitionBy("id")
.orderBy("hour")))
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count").show

这使我们保持在 DataFrame 世界中。我还简化了 OP 使用的 UDF 代码。

输出:
+---+------------+
| id|hourly_count|
+---+------------+
|id1| 12:55:44|
|id2| 12:89:34|
+---+------------+

关于scala - Spark 数据帧 : does groupBy after orderBy maintain that order?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39505599/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com