gpt4 book ai didi

python - Pyspark:窗口/带条件的累积总和

转载 作者:行者123 更新时间:2023-12-04 09:19:15 25 4
gpt4 key购买 nike

假设我有这样的数据:

+------+-------+-------+---------------------+
| Col1 | Col2 | Col3 | Col3 |
+------+-------+-------+---------------------+
| A | 0.532 | 0.234 | 2020-01-01 05:00:00 |
| B | 0.242 | 0.224 | 2020-01-01 06:00:00 |
| A | 0.152 | 0.753 | 2020-01-01 08:00:00 |
| C | 0.149 | 0.983 | 2020-01-01 08:00:00 |
| A | 0.635 | 0.429 | 2020-01-01 09:00:00 |
| A | 0.938 | 0.365 | 2020-01-01 10:00:00 |
| C | 0.293 | 0.956 | 2020-01-02 05:00:00 |
| A | 0.294 | 0.234 | 2020-01-02 06:00:00 |
| E | 0.294 | 0.394 | 2020-01-02 07:00:00 |
| D | 0.294 | 0.258 | 2020-01-02 08:00:00 |
| A | 0.687 | 0.666 | 2020-01-03 05:00:00 |
| C | 0.232 | 0.494 | 2020-01-03 06:00:00 |
| D | 0.575 | 0.845 | 2020-01-03 07:00:00 |
+------+-------+-------+---------------------+
我想创建另一列:
  • Col2 的总和
  • 按 Col1 分组
  • 仅适用于 Col3 之前 2 小时之外的记录

  • 因此,对于此示例,查看 A,并将 Col2 相加
    +------+-------+-------+---------------------+
    | Col1 | Col2 | Col3 | Col3 |
    +------+-------+-------+---------------------+
    | A | 0.532 | 0.234 | 2020-01-01 05:00:00 | => Will be null, as it is the earliest
    | A | 0.152 | 0.753 | 2020-01-01 08:00:00 | => 0.532, as 05:00:00 is >= 2 hours prior
    | A | 0.635 | 0.429 | 2020-01-01 09:00:00 | => 0.532, as 08:00:00 is <2 hours, but 05:00:00 is >= 2 hours (08:00 is within 2 hours of 09:00)
    | A | 0.938 | 0.365 | 2020-01-01 10:00:00 | => 0.532 + 0.152, as 09:00:00 is < 2 hours, but 08:00:00 and 05:00:00 are >= 2 hours prior
    | A | 0.294 | 0.234 | 2020-01-01 12:00:00 | => 0.532 + 0.152 + 0.635 + 0.938, as all of the ones on the same day are >= least 2 hours prior.
    | A | 0.687 | 0.666 | 2020-01-03 05:00:00 | => Will be null, as it is the earliest this day.
    +------+-------+-------+---------------------+
  • 我想过对它们进行排序并进行累计总和,但不确定如何排除 2 小时范围内的那些。
  • 曾考虑过按条件分组和求和,但不完全确定如何执行。
  • 还考虑过发出记录来填补空白,使它们所有的时间都被填满,并在 2 之前进行总结。但是,这需要我转换数据,因为它在每个小时的顶部并不是天生干净的;它们是实际的随机时间戳。
  • 最佳答案

    对于 Spark2.4+ 尝试这个。

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window


    w=Window().partitionBy("col1",F.to_date("col4", "yyyy-MM-dd HH:mm:ss")).orderBy(F.unix_timestamp("col4"))\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    df\
    .withColumn("try", F.collect_list("col2").over(w))\
    .withColumn("try2", F.collect_list(F.unix_timestamp("col4")).over(w))\
    .withColumn("col5", F.arrays_zip("try","try2")).drop("try")\
    .withColumn("try3",F.element_at("try2", -1))\
    .withColumn("col5", F.when(F.size("try2")>1,F.expr("""aggregate(filter(col5, x-> x.try2 <= (try3-7200)),\
    cast(0 as double), (acc,y)-> acc+y.try)""")).otherwise(None))\
    .drop("try3","try2").orderBy("col1","col4").show(truncate=False)

    #+----+-----+-----+-------------------+------------------+
    #|col1|col2 |col3 |col4 |col5 |
    #+----+-----+-----+-------------------+------------------+
    #|A |0.532|0.234|2020-01-01 05:00:00|null |
    #|A |0.152|0.753|2020-01-01 08:00:00|0.532 |
    #|A |0.635|0.429|2020-01-01 09:00:00|0.532 |
    #|A |0.938|0.365|2020-01-01 10:00:00|0.684 |
    #|A |0.294|0.234|2020-01-01 12:00:00|2.2569999999999997|
    #|A |0.687|0.666|2020-01-03 05:00:00|null |
    #|B |0.242|0.224|2020-01-01 06:00:00|null |
    #|C |0.149|0.983|2020-01-01 08:00:00|null |
    #|C |0.293|0.956|2020-01-02 05:00:00|null |
    #|C |0.232|0.494|2020-01-03 06:00:00|null |
    #|D |0.294|0.258|2020-01-02 08:00:00|null |
    #|D |0.575|0.845|2020-01-03 07:00:00|null |
    #|E |0.294|0.394|2020-01-02 07:00:00|null |
    #+----+-----+-----+-------------------+------------------+

    关于python - Pyspark:窗口/带条件的累积总和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63122823/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com