gpt4 book ai didi

dataframe - pySpark - 在滚动窗口中获取最大值行

转载 作者:行者123 更新时间:2023-12-04 20:29:12 28 4
gpt4 key购买 nike

我有一个 pyspark 数据框,下面是示例行。我试图在 10 分钟内获得最大平均值。我正在尝试使用 Window 函数,但无法实现结果。

这是我的数据框,其中包含 30 分钟的随机数据。我希望输出 3 行,每 10 分钟输出 1 行。

+-------------------+---------+
| event_time|avg_value|
+-------------------+---------+
|2019-12-29 00:01:00| 9.5|
|2019-12-29 00:02:00| 9.0|
|2019-12-29 00:04:00| 8.0|
|2019-12-29 00:06:00| 21.0|
|2019-12-29 00:08:00| 7.0|
|2019-12-29 00:11:00| 8.5|
|2019-12-29 00:12:00| 11.5|
|2019-12-29 00:14:00| 8.0|
|2019-12-29 00:16:00| 31.0|
|2019-12-29 00:18:00| 8.0|
|2019-12-29 00:21:00| 8.0|
|2019-12-29 00:22:00| 16.5|
|2019-12-29 00:24:00| 7.0|
|2019-12-29 00:26:00| 14.0|
|2019-12-29 00:28:00| 7.0|
+-------------------+---------+

我正在使用下面的代码
window_spec = Window.partitionBy('event_time').orderBy('event_time').rangeBetween(-60*10,0)
new_df = data.withColumn('rank', rank().over(window_spec))
new_df.show()

但是这段代码给了我以下错误:
pyspark.sql.utils.AnalysisException: 'Window Frame specifiedwindowframe(RangeFrame, -600, currentrow$()) must match the required frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$());'

我想要的输出是
+-------------------+---------+
| event_time|avg_value|
+-------------------+---------+
|2019-12-29 00:06:00| 21.0|
|2019-12-29 00:16:00| 31.0|
|2019-12-29 00:22:00| 16.5|
+-------------------+---------+

有人可以帮我吗?

TIA。

最佳答案

您可以使用 groupBy window .

from pyspark.sql import functions as F
df.groupBy(F.window("event_time","10 minutes"))\
.agg(F.max("avg_value").alias("avg_value")).show()

#+--------------------+---------+
#| window|avg_value|
#+--------------------+---------+
#|[2019-12-29 00:20...| 16.5|
#|[2019-12-29 00:10...| 31.0|
#|[2019-12-29 00:00...| 21.0|
#+--------------------+---------+

获得 的准确输出event_time 您希望可以使用的列 collect_list , array_sort element_at ( spark2.4+ )
from pyspark.sql import functions as F
df.groupBy(F.window("event_time","10 minutes"))\
.agg(F.element_at(F.array_sort(F.collect_list("event_time")),-2).alias("event_time"),\
F.max("avg_value").alias("avg_value")).drop("window").orderBy("event_time").show()

#+-------------------+---------+
#|event_time |avg_value|
#+-------------------+---------+
#|2019-12-29 00:06:00|21.0 |
#|2019-12-29 00:16:00|31.0 |
#|2019-12-29 00:26:00|16.5 |
#+-------------------+---------+

UPDATE :
df.groupBy(F.window("event_time","10 minutes"))\
.agg(F.collect_list(F.struct("event_time","avg_value")).alias("event_time")\
,F.max("avg_value").alias("avg_value"))\
.withColumn("event_time", F.expr("""filter(event_time, x-> x.avg_value=avg_value)"""))\
.select((F.col("event_time.event_time")[0]).alias("event_time"),"avg_value").orderBy("event_time").show()

#+-------------------+---------+
#| event_time|avg_value|
#+-------------------+---------+
#|2019-12-29 00:06:00| 21.0|
#|2019-12-29 00:16:00| 31.0|
#|2019-12-29 00:22:00| 16.5|
#+-------------------+---------+

关于dataframe - pySpark - 在滚动窗口中获取最大值行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61392304/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com