gpt4 book ai didi

scala - 填补时间序列 Spark 中的空白

转载 作者:行者123 更新时间:2023-12-04 17:35:29 24 4
gpt4 key购买 nike

我在处理时间序列数据时遇到问题。由于电源故障,数据集中缺少一些时间戳。我需要通过添加行来填补这个空白,然后,我可以插入缺失的值。

输入数据:

periodstart                usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039

想要的输出:
periodstart                usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 02:30 0
2015-09-11 02:45 0
2015-09-11 03:00 0
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039

现在我已经在数据集 foreach 函数中使用 while 循环修复了这个问题。问题是我必须先将数据集收集到驱动程序,然后才能进行 while 循环。所以这不是 Spark 的正确方式。

有人可以给我一个更好的解决方案吗?

这是我的代码:
MissingMeasurementsDS.collect().foreach(row => {
// empty list for new generated measurements
val output = ListBuffer.empty[Measurement]
// Missing measurements
val missingMeasurements = row.getAs[Int]("missingmeasurements")
val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
//Generate missing timestamps
var i = 1
while (i <= missingMeasurements) {
//Increment timestamp with 15 minutes (900000 milliseconds)
val newTimestamp = lastTimestamp.getTime + (900000 * i)
output += Measurement(new Timestamp(newTimestamp), 0))
i += 1
}
//Join interpolated measurements with correct measurements
completeMeasurementsDS.join(output.toDS())
})
completeMeasurementsDS.show()
println("OutputDF count = " + completeMeasurementsDS.count())

最佳答案

如果输入 DataFrame具有以下结构:

root
|-- periodstart: timestamp (nullable = true)
|-- usage: long (nullable = true)

斯卡拉

确定最小值/最大值:
val (minp, maxp) = df
.select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint")))
.as[(Long, Long)]
.first

设置步长,例如15分钟:
val step: Long = 15 * 60

生成引用范围:
val reference = spark
.range((minp / step) * step, ((maxp / step) + 1) * step, step)
.select($"id".cast("timestamp").alias("periodstart"))

加入并填补空白:
reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))

python

在 PySpark 中类似:

from pyspark.sql.functions import col, min as min_, max as max_

step = 15 * 60

minp, maxp = df.select(
min_("periodstart").cast("long"), max_("periodstart").cast("long")
).first()

reference = spark.range(
(minp / step) * step, ((maxp / step) + 1) * step, step
).select(col("id").cast("timestamp").alias("periodstart"))

reference.join(df, ["periodstart"], "leftouter")

关于scala - 填补时间序列 Spark 中的空白,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52095181/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com