gpt4 book ai didi

apache-spark - spark 窗口函数 VS group by 性能问题

转载 作者:行者123 更新时间:2023-12-04 04:01:53 24 4
gpt4 key购买 nike

我有一个数据框

| id | date      |  KPI_1 | ... | KPI_n
| 1 |2012-12-12 | 0.1 | ... | 0.5
| 2 |2012-12-12 | 0.2 | ... | 0.4
| 3 |2012-12-12 | 0.66 | ... | 0.66
| 1 |2012-12-13 | 0.2 | ... | 0.46
| 4 |2012-12-14 | 0.2 | ... | 0.45
| ...
| 55| 2013-03-15 | 0.5 | ... | 0.55

我们有
  • X 标识符
  • 给定日期的每个标识符一行
  • n KPI

  • 我必须为每一行计算一些派生的 KPI,这个 KPI 取决于每个 ID 的先前值。
    假设我的派生 KPI 是一个差异,它将是:
    | id | date      |  KPI_1 | ... | KPI_n | KPI_1_diff | KPI_n_diff
    | 1 |2012-12-12 | 0.1 | ... | 0.5 | 0.1 | 0.5
    | 2 |2012-12-12 | 0.2 | ... | 0.4 | 0.2 |0.4
    | 3 |2012-12-12 | 0.66 | ... | 0.66 | 0.66 | 0.66
    | 1 |2012-12-13 | 0.2 | ... | 0.46 | 0.2-0.1 | 0.46 - 0.66
    | 4 |2012-12-13 | 0.2 | ... | 0.45 ...
    | ...
    | 55| 2013-03-15 | 0.5 | ... | 0.55

    现在:我会做的是:
    val groupedDF = myDF.groupBy("id").agg(
    collect_list(struct(col("date",col("KPI_1"))).as("wrapped_KPI_1"),
    collect_list(struct(col("date",col("KPI_2"))).as("wrapped_KPI_2")
    // up until nth KPI
    )

    我会得到汇总数据,例如:
    [("2012-12-12",0.1),("2012-12-12",0.2) ...

    然后我会对这些包装的数据进行排序,使用一些 UDF 对这些聚合结果进行解包和映射,并生成输出(计算差异和其他统计数据)。

    另一种方法 是使用窗口函数,例如:
    val window = Window.partitionBy(col("id")).orderBy(col("date")).rowsBetween(Window.unboundedPreceding,0L) 

    并做:
    val windowedDF = df.select (
    col("id"),
    col("date"),
    col("KPI_1"),
    collect_list(struct(col("date"),col("KPI_1"))).over(window),
    collect_list(struct(col("date"),col("KPI_2"))).over(window)
    )

    这样我得到:
    [("2012-12-12",0.1)]
    [("2012-12-12",0.1), ("2012-12-13",0.1)]
    ...

    这看起来更好处理,但我怀疑重复窗口会为每个 KPI 产生不必要的分组和排序。

    所以这里是问题:
  • 我宁愿采用分组方法?
  • 我会去 window 吗?如果是这样,最有效的方法是什么?
  • 最佳答案

    我相信窗口方法应该是一个更好的解决方案,但在使用窗口函数之前,您应该根据 id 重新分区数据帧。这只会对数据进行一次混洗,并且所有窗口函数都应该使用已经混洗的数据帧执行。我希望它有帮助。

    代码应该是这样的。

    val windowedDF = df.repartition(col("id"))
    .select (
    col("id"),
    col("date"),
    col("KPI_1"),
    col("KPI_2"),
    collect_list(struct(col("date"),col("KPI_1"))).over(window),
    collect_list(struct(col("date"),col("KPI_2"))).over(window)
    )

    @拉斐尔·罗斯

    在这里,我们在单个窗口上聚合。这就是为什么您可能会看到相同的执行计划。请参阅下面的示例,其中只能从一个分区完成多个窗口的聚合。
    val list = Seq(( "2", null, 1, 11, 1, 1 ),
    ( "2", null, 1, 22, 2, 2 ),
    ( "2", null, 1, 11, 1, 3 ),
    ( "2", null, 1, 22, 2, 1 ),
    ( "2", null, 1, 33, 1, 2 ),
    ( null, "3", 3, 33, 1, 2 ),
    ( null, "3", 3, 33, 2, 3 ),
    ( null, "3", 3, 11, 1, 1 ),
    ( null, "3", 3, 22, 2, 2 ),
    ( null, "3", 3, 11, 1, 3 )
    )

    val df = spark.sparkContext.parallelize(list).toDF("c1","c2","batchDate","id", "pv" , "vv")

    val c1Window = Window.partitionBy("batchDate", "c1")
    val c2Window = Window.partitionBy("batchDate", "c2")

    val agg1df = df.withColumn("c1List",collect_list("pv").over(c1Window))
    .withColumn("c2List", collect_list("pv").over(c2Window))

    val agg2df = df.repartition($"batchDate")
    .withColumn("c1List",collect_list("pv").over(c1Window))
    .withColumn("c2List", collect_list("pv").over(c2Window))


    agg1df.explain()
    == Physical Plan ==
    Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#38], [batchDate#16, c2#15]
    +- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(batchDate#16, c2#15, 1)
    +- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#28], [batchDate#16, c1#14]
    +- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(batchDate#16, c1#14, 1)
    +- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
    +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
    +- Scan ExternalRDDScan[obj#6]

    agg2df.explain()
    == Physical Plan ==
    Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#60], [batchDate#16, c2#15]
    +- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
    +- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#50], [batchDate#16, c1#14]
    +- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(batchDate#16, 1)
    +- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
    +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
    +- Scan ExternalRDDScan[obj#6]

    关于apache-spark - spark 窗口函数 VS group by 性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54332942/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com