gpt4 book ai didi

apache-spark - pyspark:使用时间序列数据的滚动平均值

转载 作者:行者123 更新时间:2023-12-03 21:20:45 24 4
gpt4 key购买 nike

我有一个由时间戳列和美元列组成的数据集。我想找到以每行的时间戳结束的每周平均美元数。我最初正在查看 pyspark.sql.functions.window 函数,但它按周对数据进行分类。

下面是一个例子:

%pyspark
import datetime
from pyspark.sql import functions as F

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()

这导致两条记录:
|        start        |          end         | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|

窗口函数对时间序列数据进行分箱,而不是执行滚动平均。

有没有办法执行滚动平均值,我将获得每行的每周平均值,时间段结束于该行的 timestampGMT?

编辑:

下面张的回答接近我想要的,但不完全是我想看到的。

这是一个更好的例子来展示我想要得到的东西:
%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-15T12:27:18+00:00"),
(25, "2017-03-18T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))

这导致以下数据帧:
dollars timestampGMT            rolling_average
25 2017-03-18 11:27:18.0 25
17 2017-03-10 15:27:18.0 15
13 2017-03-15 12:27:18.0 15

我希望在timestampGMT 列中处理日期的平均值超过一周,这将导致:
dollars timestampGMT            rolling_average
17 2017-03-10 15:27:18.0 17
13 2017-03-15 12:27:18.0 15
25 2017-03-18 11:27:18.0 19

在上面的结果中,2017-03-10 的 rolling_average 是 17,因为没有前面的记录。 2017-03-15 的滚动平均值为 15,因为它是 2017-03-15 的 13 和 2017-03-10 的 17 的平均值,后者落在前 7 天窗口内。 2017-03-18 的滚动平均值为 19,因为它是 2017-03-18 的 25 和 2017-03-10 的 13 的平均值,后者落在前 7 天窗口内,并且不包括 2017 的 17 -03-10 因为这不属于前 7 天的窗口。

有没有办法做到这一点,而不是每周窗口不重叠的分箱窗口?

最佳答案

我想出了使用此 stackoverflow 计算移动/滚动平均值的正确方法:

Spark Window Functions - rangeBetween dates

基本思想是将时间戳列转换为秒,然后您可以使用 pyspark.sql.Window 类中的 rangeBetween 函数在窗口中包含正确的行。

这是已解决的示例:

%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window


#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-15T12:27:18+00:00"),
(25, "2017-03-18T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('rolling_average', F.avg("dollars").over(w))

这导致我正在寻找的滚动平均值的确切列:
dollars   timestampGMT            rolling_average
17 2017-03-10 15:27:18.0 17.0
13 2017-03-15 12:27:18.0 15.0
25 2017-03-18 11:27:18.0 19.0

关于apache-spark - pyspark:使用时间序列数据的滚动平均值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45806194/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com