gpt4 book ai didi

apache-spark - PySpark:如何重新采样频率

转载 作者:行者123 更新时间:2023-12-04 04:23:07 26 4
gpt4 key购买 nike

想象一个由来自变量的值观察组成的 Spark 数据帧。每个观察都有一个特定的时间戳,并且这些时间戳在不同变量之间是不同的。这是因为时间戳是在变量的值发生变化并被记录时生成的。

#Variable     Time                Value
#852-YF-007 2016-05-10 00:00:00 0
#852-YF-007 2016-05-09 23:59:00 0
#852-YF-007 2016-05-09 23:58:00 0

问题 我想使用前向填充将所有变量放入相同的频率(例如 10 分钟)。为了可视化这一点,我从“Python for Data Analysis”一书中复制了一页。 问题:如何在 中的 Spark 数据帧上执行此操作高效 方式?

Python for Data Analysis

最佳答案

Question: How to do that on a Spark Dataframe in an efficient way?


Spark DataFrame 对于这样的操作来说根本不是一个好的选择。一般来说,SQL 原语的表达能力不够强,而且 PySpark DataFrame 不提供实现它所需的低级访问。
虽然可以使用纪元/时间戳算法轻松表示重新采样。有了这样的数据:
from pyspark.sql.functions import col, max as max_, min as min_

df = (spark
.createDataFrame([
("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)],
["ts", "val"])
.withColumn("ts", col("ts").cast("date").cast("timestamp")))
我们可以重新采样输入:
day = 60 * 60 * 24
epoch = (col("ts").cast("bigint") / day).cast("bigint") * day

with_epoch = df.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()
并加入引用:
# Reference range 
ref = spark.range(
min_epoch, max_epoch + 1, day
).toDF("epoch")

(ref
.join(with_epoch, "epoch", "left")
.orderBy("epoch")
.withColumn("ts_resampled", col("epoch").cast("timestamp"))
.show(15, False))

## +----------+---------------------+------+---------------------+
## |epoch |ts |val |ts_resampled |
## +----------+---------------------+------+---------------------+
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|
## |1339545600|null |null |2012-06-13 02:00:00.0|
## |1339632000|null |null |2012-06-14 02:00:00.0|
## |1339718400|null |null |2012-06-15 02:00:00.0|
## |1339804800|null |null |2012-06-16 02:00:00.0|
## |1339891200|null |null |2012-06-17 02:00:00.0|
## |1339977600|null |null |2012-06-18 02:00:00.0|
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|
## |1340150400|null |null |2012-06-20 02:00:00.0|
## |1340236800|null |null |2012-06-21 02:00:00.0|
## |1340323200|null |null |2012-06-22 02:00:00.0|
## |1340409600|null |null |2012-06-23 02:00:00.0|
## |1340496000|null |null |2012-06-24 02:00:00.0|
## |1340582400|null |null |2012-06-25 02:00:00.0|
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|
## +----------+---------------------+------+---------------------+
在 Spark >= 3.1 中替换
col("epoch").cast("timestamp")
from pyspark.sql.functions import timestamp_seconds

timestamp_seconds("epoch")
使用低级 API 可以像我在对 Spark / Scala: forward fill with last observation 的回答中显示的那样填充数据。使用 RDD,我们还可以避免两次混洗数据(一次用于连接,一次用于重新排序)。
但这里还有更重要的问题。当问题可以简化为按元素或按分区进行计算时,Spark 的性能最佳。虽然前向填充是可能的情况,但据我所知,常用的时间序列模型通常不是这种情况,如果某些操作需要顺序访问,那么 Spark 根本不会提供任何好处。
因此,如果您使用大到需要分布式数据结构的系列,您可能希望将其聚合到某个可以由单台机器轻松处理的对象,然后使用您最喜欢的非分布式工具来处理其余部分。
如果您使用多个时间序列,每个时间序列都可以在内存中处理,那么当然有 sparkts ,但我知道您已经意识到这一点。

关于apache-spark - PySpark:如何重新采样频率,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39271374/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com