gpt4 book ai didi

sql - PySpark/Spark 窗口函数第一期/最后一期

转载 作者:行者123 更新时间:2023-12-02 03:41:06 25 4
gpt4 key购买 nike

据我了解,Spark 中的第一个/最后一个函数将检索每个分区的第一行/最后一行/我无法理解为什么 LAST 函数给出不正确的结果。

这是我的代码。

AgeWindow = Window.partitionBy('Dept').orderBy('Age')
df1 = df1.withColumn('first(ID)', first('ID').over(AgeWindow))\
.withColumn('last(ID)', last('ID').over(AgeWindow))
df1.show()
+---+----------+---+--------+--------------------------+-------------------------+
|Age| Dept| ID| Name|first(ID) |last(ID) |
+---+----------+---+--------+--------------------------+-------------------------+
| 38| medicine| 4| harry| 4| 4|
| 41| medicine| 5|hermione| 4| 5|
| 55| medicine| 7| gandalf| 4| 7|
| 15|technology| 6| sirius| 6| 6|
| 49|technology| 9| sam| 6| 9|
| 88|technology| 1| sam| 6| 2|
| 88|technology| 2| nik| 6| 2|
| 75| mba| 8| ginny| 8| 11|
| 75| mba| 10| sam| 8| 11|
| 75| mba| 3| ron| 8| 11|
| 75| mba| 11| ron| 8| 11|
+---+----------+---+--------+--------------------------+-------------------------+

最佳答案

这句话并没有错。您的窗口定义并不是您想象的那样。

如果您提供ORDER BY子句,则默认框架为RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:

from pyspark.sql.window import Window
from pyspark.sql.functions import first, last

w = Window.partitionBy('Dept').orderBy('Age')

df = spark.createDataFrame(
[(38, "medicine", 4), (41, "medicine", 5), (55, "medicine", 7)],
("Age", "Dept", "ID")
)

df.select(
"*",
first('ID').over(w).alias("first_id"),
last('ID').over(w).alias("last_id")
).explain()
== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS first_id#38L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_id#40L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Dept#23, 200)
+- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]

这意味着窗口函数永远不会向前查看,并且框架中的最后一行是当前行。

您应该将窗口重新定义为

w_uf = (Window
.partitionBy('Dept')
.orderBy('Age')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

result = df.select(
"*",
first('ID').over(w_uf).alias("first_id"),
last('ID').over(w_uf).alias("last_id")
)
== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_id#56L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_id#58L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Dept#23, 200)
+- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
result.show()
+---+--------+---+--------+-------+
|Age| Dept| ID|first_id|last_id|
+---+--------+---+--------+-------+
| 38|medicine| 4| 4| 7|
| 41|medicine| 5| 4| 7|
| 55|medicine| 7| 4| 7|
+---+--------+---+--------+-------+

关于sql - PySpark/Spark 窗口函数第一期/最后一期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52273186/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com