gpt4 book ai didi

dataframe - 整个数据框上的 Pyspark 窗口函数

转载 作者:行者123 更新时间:2023-12-04 11:30:52 24 4
gpt4 key购买 nike

考虑一个 pyspark 数据框。我想总结整个数据框,每列,并为每一行附加结果。

+-----+----------+-----------+
|index| col1| col2 |
+-----+----------+-----------+
| 0.0|0.58734024|0.085703015|
| 1.0|0.67304325| 0.17850411|

预期结果
+-----+----------+-----------+-----------+-----------+-----------+-----------+
|index| col1| col2 | col1_min | col1_mean |col2_min | col2_mean
+-----+----------+-----------+-----------+-----------+-----------+-----------+
| 0.0|0.58734024|0.085703015| -5 | 2.3 | -2 | 1.4 |
| 1.0|0.67304325| 0.17850411| -5 | 2.3 | -2 | 1.4 |

据我所知,我需要将整个数据框作为 Window 的 Window 函数,以保留每一行的结果(而不是,例如,分别进行统计,然后再加入以复制每一行)

我的问题是:
  • 如何在没有任何分区或命令的情况下编写 Window

  • 我知道有带有分区和顺序的标准窗口,但不是将所有内容都作为 1 个单独分区的窗口
    w = Window.partitionBy("col1", "col2").orderBy(desc("col1"))
    df = df.withColumn("col1_mean", mean("col1").over(w)))

    我将如何编写一个将所有内容作为一个分区的窗口?
  • 为所有列动态写入的任何方式。

  • 假设我有 500 列,重复写入看起来不太好。
    df = df.withColumn("col1_mean", mean("col1").over(w))).withColumn("col1_min", min("col2").over(w)).withColumn("col2_mean", mean().over(w)).....

    假设我想要每个列的多个统计信息,所以每个 colx会生成 colx_min, colx_max, colx_mean .

    最佳答案

    您可以将自定义聚合与交叉连接结合使用,而不是使用 window 来实现相同的效果:

    import pyspark.sql.functions as F
    from pyspark.sql.functions import broadcast
    from itertools import chain

    df = spark.createDataFrame([
    [1, 2.3, 1],
    [2, 5.3, 2],
    [3, 2.1, 4],
    [4, 1.5, 5]
    ], ["index", "col1", "col2"])

    agg_cols = [(
    F.min(c).alias("min_" + c),
    F.max(c).alias("max_" + c),
    F.mean(c).alias("mean_" + c))

    for c in df.columns if c.startswith('col')]

    stats_df = df.agg(*list(chain(*agg_cols)))

    # there is no performance impact from crossJoin since we have only one row on the right table which we broadcast (most likely Spark will broadcast it anyway)
    df.crossJoin(broadcast(stats_df)).show()

    # +-----+----+----+--------+--------+---------+--------+--------+---------+
    # |index|col1|col2|min_col1|max_col1|mean_col1|min_col2|max_col2|mean_col2|
    # +-----+----+----+--------+--------+---------+--------+--------+---------+
    # | 1| 2.3| 1| 1.5| 5.3| 2.8| 1| 5| 3.0|
    # | 2| 5.3| 2| 1.5| 5.3| 2.8| 1| 5| 3.0|
    # | 3| 2.1| 4| 1.5| 5.3| 2.8| 1| 5| 3.0|
    # | 4| 1.5| 5| 1.5| 5.3| 2.8| 1| 5| 3.0|
    # +-----+----+----+--------+--------+---------+--------+--------+---------+

    注意 1:使用广播我们将避免改组,因为广播的 df 将被发送到所有执行程序。

    注2:与 chain(*agg_cols)我们将上一步中创建的元组列表展平。

    更新:

    下面是上述程序的执行计划:
    == Physical Plan ==
    *(3) BroadcastNestedLoopJoin BuildRight, Cross
    :- *(3) Scan ExistingRDD[index#196L,col1#197,col2#198L]
    +- BroadcastExchange IdentityBroadcastMode, [id=#274]
    +- *(2) HashAggregate(keys=[], functions=[finalmerge_min(merge min#233) AS min(col1#197)#202, finalmerge_max(merge max#235) AS max(col1#197)#204, finalmerge_avg(merge sum#238, count#239L) AS avg(col1#197)#206, finalmerge_min(merge min#241L) AS min(col2#198L)#208L, finalmerge_max(merge max#243L) AS max(col2#198L)#210L, finalmerge_avg(merge sum#246, count#247L) AS avg(col2#198L)#212])
    +- Exchange SinglePartition, [id=#270]
    +- *(1) HashAggregate(keys=[], functions=[partial_min(col1#197) AS min#233, partial_max(col1#197) AS max#235, partial_avg(col1#197) AS (sum#238, count#239L), partial_min(col2#198L) AS min#241L, partial_max(col2#198L) AS max#243L, partial_avg(col2#198L) AS (sum#246, count#247L)])
    +- *(1) Project [col1#197, col2#198L]
    +- *(1) Scan ExistingRDD[index#196L,col1#197,col2#198L]

    在这里我们看到一个 BroadcastExchangeSinglePartitionstats_df 开始播放单行可以装入 SinglePartition .因此,这里混洗的数据只有一行(可能的最小值)。

    关于dataframe - 整个数据框上的 Pyspark 窗口函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60418329/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com