gpt4 book ai didi

python - 最后 N 个数据点的 Pyspark 结构化流窗口(移动平均)

转载 作者:行者123 更新时间:2023-12-05 07:14:34 25 4
gpt4 key购买 nike

我使用 Pyspark Structured Streaming 2.4.4 从 kafka 主题中读取了几个数据帧。我想向该数据框添加一些新列,这些列主要基于过去 N 个数据点的窗口计算(例如:最近 20 个数据点的移动平均值),并且随着新数据点的传递,相应的值MA_20 应该立即计算出来。

数据可能是这样的:时间戳 |波动指数

2020-01-22 10:20:32 | 13.05
2020-01-22 10:25:31 | 14.35
2020-01-23 09:00:20 | 14.12

值得一提的是,数据将从周一到周五每天 8 小时内接收。因此,周一早上计算的移动平均线应该包括周五的数据!

我尝试了不同的方法,但仍然无法实现我想要的。

windows = df_vix \
.withWatermark("Timestamp", "100 minutes") \
.groupBy(F.window("Timestamp", "100 minute", "5 minute")) \

aggregatedDF = windows.agg(F.avg("VIX"))

前面的代码计算了 MA,但它会将周五的数据视为迟到的数据,因此它们将被排除在外。比最后 100 分钟更好的应该是最后 20 分(间隔 5 分钟)。

我认为我可以使用 rowsBetween 或 rangeBetween,但在流式数据帧窗口中不能应用于非时间戳列 (F.col('Timestamp').cast('long'))

    w = Window.orderBy(F.col('Timestamp').cast('long')).rowsBetween(-600, 0)

df = df_vix.withColumn('MA_20', F.avg('VIX').over(w)

)

但另一方面,使用 rowsBetween(- minutes(20), 0) 无法在 rowsBetween() 中指定间隔 throws: minutes are not defined (sql.functions 中没有这样的函数)

我找到了另一种方法,但它也不适用于流数据帧。不知道为什么会出现 'Non-time-based windows are not supported on streaming DataFrames' 错误(df_vix.Timestamp 是时间戳类型)

df.createOrReplaceTempView("df_vix")

df_vix.createOrReplaceTempView("df_vix")
aggregatedDF = spark.sql(
"""SELECT *, mean(VIX) OVER (
ORDER BY CAST(df_vix.Timestamp AS timestamp)
RANGE BETWEEN INTERVAL 100 MINUTES PRECEDING AND CURRENT ROW
) AS mean FROM df_vix""")

我不知道我还能用什么来计算简单的移动平均线。看起来在 Pyspark 中不可能实现这一点......也许更好的解决方案是每次新数据将整个 Spark 数据帧传输到 Pandas 并计算 Pandas 中的所有内容时进行转换(或将新行附加到 Pandas 并计算 MA)? ??

我认为随着新数据的到来创建新功能是结构化流的主要目的,但事实证明 Pyspark 不适合这个,我正在考虑放弃 Pyspark 转向 Pandas ...

编辑

尽管 df_vix.Timestamp 类型为“时间戳”,但以下内容也不起作用,但它会抛出“流式数据帧不支持非基于时间的窗口”错误。

w = Window.orderBy(df_vix.Timestamp).rowsBetween(-20, -1)
aggregatedDF = df_vix.withColumn("MA", F.avg("VIX").over(w))

最佳答案

你看过window了吗?在事件时间操作? window(timestamp, "10 minutes", "5 minutes") 将每 5 分钟为您提供一个 10 分钟的数据帧,然后您可以对其进行聚合,包括移动平均数。

关于python - 最后 N 个数据点的 Pyspark 结构化流窗口(移动平均),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59877772/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com