gpt4 book ai didi

scala - 按列 "grp"分组并压缩 DataFrame -(按列 "ord"对每列排序取最后一个非空值)

转载 作者:行者123 更新时间:2023-12-04 17:15:09 25 4
gpt4 key购买 nike

假设我有以下数据帧:

+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 3|null| 11|
| 2| null| 2| xxx| 22|
| 1| null| 1| yyy|null|
| 2| null| 7|null| 33|
| 1| null| 12|null|null|
| 2| null| 19|null| 77|
| 1| null| 10| s13|null|
| 2| null| 11| a23|null|
+---+--------+---+----+----+

这是带有评论的相同示例 DF,按 grp 排序和 ord :
scala> df.orderBy("grp", "ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 1| yyy|null|
| 1| null| 3|null| 11| # grp:1 - last value for `col2` (11)
| 1| null| 10| s13|null| # grp:1 - last value for `col1` (s13)
| 1| null| 12|null|null| # grp:1 - last values for `null_col`, `ord`
| 2| null| 2| xxx| 22|
| 2| null| 7|null| 33|
| 2| null| 11| a23|null| # grp:2 - last value for `col1` (a23)
| 2| null| 19|null| 77| # grp:2 - last values for `null_col`, `ord`, `col2`
+---+--------+---+----+----+

我想压缩它。 IE。按列分组 "grp"对于每个组,按 "ord" 对行进行排序列并取最后 not null每列中的值(如果有)。
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 12| s13| 11|
| 2| null| 19| a23| 77|
+---+--------+---+----+----+

我见过以下类似的问题:
  • How to select the first row of each group?
  • How to find first non-null values in groups? (secondary sorting using dataset api)

  • 但我真正的 DataFrame 有超过 250 列,所以我需要一个解决方案,我不必明确指定所有列。

    我无法绕过它......

    MCVE:如何创建示例数据帧:
  • 创建本地文件“/tmp/data.txt”并复制并粘贴DataFrame的上下文(如上面发布的那样)
  • 定义 function readSparkOutput() :
  • 将“/tmp/data.txt”解析为DataFrame:
    val df = readSparkOutput("file:///tmp/data.txt")


  • 更新:我认为它应该类似于以下SQL:
    SELECT
    grp, ord, null_col, col1, col2
    FROM (
    SELECT
    grp,
    ord,
    FIRST(null_col) OVER (PARTITION BY grp ORDER BY ord DESC) as null_col,
    FIRST(col1) OVER (PARTITION BY grp ORDER BY ord DESC) as col1,
    FIRST(col2) OVER (PARTITION BY grp ORDER BY ord DESC) as col2,
    ROW_NUMBER() OVER (PARTITION BY grp ORDER BY ord DESC) as rn
    FROM table_name) as v
    WHERE v.rn = 1;

    how can we dynamically generate such a Spark query?



    我尝试了以下简化方法:
    import org.apache.spark.sql.expressions.Window

    val win = Window
    .partitionBy("grp")
    .orderBy($"ord".desc)

    val cols = df.columns.map(c => first(c, ignoreNulls=true).over(win).as(c))

    它产生:
    scala> cols
    res23: Array[org.apache.spark.sql.Column] = Array(first(grp, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `grp`, first(null_col, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `null_col`, first(ord, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `ord`, first(col1, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col1`, first(col2, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col2`)

    但我无法将其传递给 df.select :
    scala> df.select(cols.head, cols.tail: _*).show
    <console>:34: error: no `: _*' annotation allowed here
    (such annotations are only allowed in arguments to *-parameters)
    df.select(cols.head, cols.tail: _*).show

    另一种尝试:
    scala> df.select(cols.map(col): _*).show
    <console>:34: error: type mismatch;
    found : String => org.apache.spark.sql.Column
    required: org.apache.spark.sql.Column => ?
    df.select(cols.map(col): _*).show

    最佳答案

    考虑以下应用窗口函数的方法 last(c, ignoreNulls=true)按每个“grp”的“ord”排序到每个选定的列;后跟一个 groupBy("grp")获取 first agg(colFcnMap)结果:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window

    val df0 = Seq(
    (1, 3, None, Some(11)),
    (2, 2, Some("aaa"), Some(22)),
    (1, 1, Some("s12"), None),
    (2, 7, None, Some(33)),
    (1, 12, None, None),
    (2, 19, None, Some(77)),
    (1, 10, Some("s13"), None),
    (2, 11, Some("a23"), None)
    ).toDF("grp", "ord", "col1", "col2")

    val df = df0.withColumn("null_col", lit(null))

    df.orderBy("grp", "ord").show
    // +---+---+----+----+--------+
    // |grp|ord|col1|col2|null_col|
    // +---+---+----+----+--------+
    // | 1| 1| s12|null| null|
    // | 1| 3|null| 11| null|
    // | 1| 10| s13|null| null|
    // | 1| 12|null|null| null|
    // | 2| 2| aaa| 22| null|
    // | 2| 7|null| 33| null|
    // | 2| 11| a23|null| null|
    // | 2| 19|null| 77| null|
    // +---+---+----+----+--------+

    val win = Window.partitionBy("grp").orderBy("ord").
    rowsBetween(0, Window.unboundedFollowing)

    val nonAggCols = Array("grp")
    val cols = df.columns.diff(nonAggCols) // Columns to be aggregated

    val colFcnMap = cols.zip(Array.fill(cols.size)("first")).toMap
    // colFcnMap: scala.collection.immutable.Map[String,String] =
    // Map(ord -> first, col1 -> first, col2 -> first, null_col -> first)

    cols.foldLeft(df)((acc, c) =>
    acc.withColumn(c, last(c, ignoreNulls=true).over(win))
    ).
    groupBy("grp").agg(colFcnMap).
    select(col("grp") :: colFcnMap.toList.map{case (c, f) => col(s"$f($c)").as(c)}: _*).
    show
    // +---+---+----+----+--------+
    // |grp|ord|col1|col2|null_col|
    // +---+---+----+----+--------+
    // | 1| 12| s13| 11| null|
    // | 2| 19| a23| 77| null|
    // +---+---+----+----+--------+

    注意最后的 select用于从聚合列名称中剥离函数名称(在本例中为 first() )。

    关于scala - 按列 "grp"分组并压缩 DataFrame -(按列 "ord"对每列排序取最后一个非空值),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53154848/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com