gpt4 book ai didi

pyspark - 使用 pyspark 对列上的值求和

转载 作者:行者123 更新时间:2023-12-04 10:04:00 26 4
gpt4 key购买 nike

我有一个场景,我有 2 个表,一个表有天数,另一个表有值。因此,从有天数的表中,我需要对另一个表的相同天数的值求和。数据框

dataframe1
df1 = spark.createDataFrame(
[
('ll',5)
('yy',6)
],
('x','days')
)
dataframe2
df = spark.createDataFrame(
[
('ll','2020-01-05','1','10','50'),
('ll','2020-01-06','1','10'),
('ll','2020-01-07','1','10'),
('ll','2020-01-08','1','10'),
('ll','2020-01-09','1','10'),
('ll','2020-01-10','1','10'),
('ll','2020-01-11','1','20'),
('ll','2020-01-12','1','10'),
('ll','2020-01-05','2','30'),
('ll','2020-01-06','2','30'),
('ll','2020-01-07','2','30'),
('ll','2020-01-08','2','40'),
('ll','2020-01-09','2','30'),
('ll','2020-01-10','2','10'),
('ll','2020-01-11','2','10'),
('ll','2020-01-12','2','10'),
('yy','2020-01-05','1','20'),
('yy','2020-01-06','1','20'),
('yy','2020-01-07','1','20'),
('yy','2020-01-08','1','20'),
('yy','2020-01-09','1','20'),
('yy','2020-01-10','1','40'),
('yy','2020-01-11','1','20'),
('yy','2020-01-12','1','20'),
('yy','2020-01-05','2','40'),
('yy','2020-01-06','2','40'),
('yy','2020-01-07','2','40'),
('yy','2020-01-08','2','40'),
('yy','2020-01-09','2','40'),
('yy','2020-01-10','2','40'),
('yy','2020-01-11','2','60'),
('yy','2020-01-12','2','40')
],
('x','date','flag','value')
)

expected_dataframe = spark.createDataFrame(
[
('ll','2020-01-05','1','10','50'),
('ll','2020-01-06','1','10','50'),
('ll','2020-01-07','1','10','60'),
('ll','2020-01-08','1','10','60'),
('ll','2020-01-09','1','10','50'),
('ll','2020-01-10','1','10','40'),
('ll','2020-01-11','1','20','30'),
('ll','2020-01-12','1','10','10'),
('ll','2020-01-05','2','30','170'),
('ll','2020-01-06','2','30','140'),
('ll','2020-01-07','2','30','120'),
('ll','2020-01-08','2','40','100'),
('ll','2020-01-09','2','30','60'),
('ll','2020-01-10','2','10','30'),
('ll','2020-01-11','2','10','20'),
('ll','2020-01-12','2','10','10'),
('yy','2020-01-05','1','20','140'),
('yy','2020-01-06','1','20','140'),
('yy','2020-01-07','1','20','140'),
('yy','2020-01-08','1','20','120'),
('yy','2020-01-09','1','20','100'),
('yy','2020-01-10','1','40','80'),
('yy','2020-01-11','1','20','40'),
('yy','2020-01-12','1','20','20'),
('yy','2020-01-05','2','40','240'),
('yy','2020-01-06','2','40','260'),
('yy','2020-01-07','2','40','260'),
('yy','2020-01-08','2','40','220'),
('yy','2020-01-09','2','40','180'),
('yy','2020-01-10','2','40','140'),
('yy','2020-01-11','2','60','100'),
('yy','2020-01-12','2','40','40')
],
('x','date','flag','value','result')

预期结果

    +---+----------+----+-----+------+
| x| date|flag|value|result|
+---+----------+----+-----+------+
| ll|2020-01-05| 1| 10| 50|
| ll|2020-01-06| 1| 10| 50|
| ll|2020-01-07| 1| 10| 60|
| ll|2020-01-08| 1| 10| 60|
| ll|2020-01-09| 1| 10| 50|
| ll|2020-01-10| 1| 10| 40|
| ll|2020-01-11| 1| 20| 30|
| ll|2020-01-12| 1| 10| 10|
| ll|2020-01-05| 2| 30| 170|
| ll|2020-01-06| 2| 30| 140|
| ll|2020-01-07| 2| 30| 120|
| ll|2020-01-08| 2| 40| 100|
| ll|2020-01-09| 2| 30| 60|
| ll|2020-01-10| 2| 10| 30|
| ll|2020-01-11| 2| 10| 20|
| ll|2020-01-12| 2| 10| 10|
| yy|2020-01-05| 1| 20| 140|
| yy|2020-01-06| 1| 20| 140|
| yy|2020-01-07| 1| 20| 140|
| yy|2020-01-08| 1| 20| 120|
| yy|2020-01-09| 1| 20| 100|
| yy|2020-01-10| 1| 40| 80|
| yy|2020-01-11| 1| 20| 40|
| yy|2020-01-12| 1| 20| 20|
| yy|2020-01-05| 2| 40| 240|
| yy|2020-01-06| 2| 40| 260|
| yy|2020-01-07| 2| 40| 260|
| yy|2020-01-08| 2| 40| 220|
| yy|2020-01-09| 2| 40| 180|
| yy|2020-01-10| 2| 40| 140|
| yy|2020-01-11| 2| 60| 100|
| yy|2020-01-12| 2| 40| 40|
+---+----------+----+-----+------+

代码

from pyspark.sql.window import Window
from pyspark.sql.functions import *
df_join = df.join(df1,['x'],'inner').withColumn('date',to_date(col('date'),'yyyy-MM-dd'))
from pyspark.sql.window import Window
w1 = Window.partitionBy('x','flag').orderBy(col['date'].desc())

所以我需要基于天数列对值列求和,即如果天数列为 5,我需要对 5 行值求和。

我加入了这两个表并使用我试图解决的窗口函数,但 id 没有解决并且无法弄清楚如何解决它。任何人都可以告诉我如何解决它

最佳答案

首先您可以在 x 上加入,然后在您的行上创建一个 row_number(),这将用于在大于天数的地方挑出(将它们变成nulls),然后对分区求和只有窗口 在所有行中广播您的总和。

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("x","flag").orderBy(F.to_date("date","yyyy-dd-MM"))
w1=Window().partitionBy("x","flag")
df.join(df1, ['x'])\
.withColumn("rowNum", F.row_number().over(w))\
.withColumn("expected_result", F.sum(F.when(F.col("rowNum")>F.col("days")\
,F.lit(None)).otherwise(F.col("value")))\
.over(w1)).drop("days","rowNum").show()

#+---+----------+----+-----+---------------+
#| x| date|flag|value|expected_result|
#+---+----------+----+-----+---------------+
#| ll|2020-01-05| 1| 10| 50.0|
#| ll|2020-01-06| 1| 10| 50.0|
#| ll|2020-01-07| 1| 10| 50.0|
#| ll|2020-01-08| 1| 10| 50.0|
#| ll|2020-01-09| 1| 10| 50.0|
#| ll|2020-01-10| 1| 10| 50.0|
#| ll|2020-01-11| 1| 10| 50.0|
#| ll|2020-01-12| 1| 10| 50.0|
#| ll|2020-01-05| 2| 30| 150.0|
#| ll|2020-01-06| 2| 30| 150.0|
#| ll|2020-01-07| 2| 30| 150.0|
#| ll|2020-01-08| 2| 30| 150.0|
#| ll|2020-01-09| 2| 30| 150.0|
#| ll|2020-01-10| 2| 10| 150.0|
#| ll|2020-01-11| 2| 10| 150.0|
#| ll|2020-01-12| 2| 10| 150.0|
#| yy|2020-01-05| 1| 20| 120.0|
#| yy|2020-01-06| 1| 20| 120.0|
#| yy|2020-01-07| 1| 20| 120.0|
#| yy|2020-01-08| 1| 20| 120.0|
#+---+----------+----+-----+---------------+
#only showing top 20 rows

更新:

对于 Spark2.4+,您可以在 之后使用高阶函数 transformaggregate>collect_list。我假设数据按照所提供的示例进行排序,如果不是这种情况,则需要添加一个额外的步骤来确保这一点。

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("x","flag")
w1=Window().partitionBy("x","flag").orderBy(F.to_date("date","yyyy-dd-MM"))

df.join(df1,['x'])\
.withColumn("result", F.collect_list("value").over(w))\
.withColumn("rowNum", F.row_number().over(w1)-1)\
.withColumn("result", F.expr("""aggregate(transform(result,(x,i)->array(x,i)),0,(acc,x)-> \
IF((int(x[1])>=rowNum)and(int(x[1])<days+rowNum),int(x[0])+acc,acc))"""))\
.drop("flag","rowNum","days").show()


#+---+----------+-----+------+
#| x| date|value|result|
#+---+----------+-----+------+
#| ll|2020-01-05| 10| 50|
#| ll|2020-01-06| 10| 50|
#| ll|2020-01-07| 10| 60|
#| ll|2020-01-08| 10| 60|
#| ll|2020-01-09| 10| 50|
#| ll|2020-01-10| 10| 40|
#| ll|2020-01-11| 20| 30|
#| ll|2020-01-12| 10| 10|
#| ll|2020-01-05| 30| 160|
#| ll|2020-01-06| 30| 140|
#| ll|2020-01-07| 30| 120|
#| ll|2020-01-08| 40| 100|
#| ll|2020-01-09| 30| 60|
#| ll|2020-01-10| 10| 30|
#| ll|2020-01-11| 10| 20|
#| ll|2020-01-12| 10| 10|
#| yy|2020-01-05| 20| 140|
#| yy|2020-01-06| 20| 140|
#| yy|2020-01-07| 20| 140|
#| yy|2020-01-08| 20| 120|
#| yy|2020-01-09| 20| 100|
#| yy|2020-01-10| 40| 80|
#| yy|2020-01-11| 20| 40|
#| yy|2020-01-12| 20| 20|
#| yy|2020-01-05| 40| 240|
#| yy|2020-01-06| 40| 260|
#| yy|2020-01-07| 40| 260|
#| yy|2020-01-08| 40| 220|
#| yy|2020-01-09| 40| 180|
#| yy|2020-01-10| 40| 140|
#| yy|2020-01-11| 60| 100|
#| yy|2020-01-12| 40| 40|
#+---+----------+-----+------+

此外,在您的示例中,第 9 行应该是 160,而不是 170。

关于pyspark - 使用 pyspark 对列上的值求和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61681295/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com