gpt4 book ai didi

scala - Spark RDD : filling inregular time series

转载 作者:行者123 更新时间:2023-12-01 09:06:46 25 4
gpt4 key购买 nike

所以我有一个包含不规则时间序列数据的 RDD:

1,
4、<值4>
6、<值6>
..等

我需要将其填充到常规时间序列中:

1,
2、<值1>
3、<值1>
4、<值4>
5、<值4>
6、<值6>
..等

到目前为止,我已经创建了一个包含 1,2,3,4,5,6,.. 的 RDD,然后将其 leftOuterJoin'ed 到原始 RDD,这给了我:

1,
2、<无>
3、<无>
4、<值4>
5、<无>
6、<值6>
..等

所以我面临的问题是用先前非空行的值填充那些 2、3、5。

我更愿意在 RDD 级别上执行此操作而不使用 sparkSQL,这当然是不得已的选择。转到 scala 数组级别并不是很吸引人,因为出于性能问题,我更愿意将其保持在 RDD 级别。

谢谢

最佳答案

无需初始 join 的相对简单的解决方案。让我们从虚拟数据和辅助函数开始:

val rdd = sc.parallelize(Seq(
(3L, 1.0), (1L, 4.0), (5L, 3.6), (7L, 0.2), (8L, 0.0)))

def fillTimePoints(xs: Array[(Long, Double)]) = xs match {
case Array((xTime, xValue), (yTime, _)) => {
val diff = yTime - xTime

if (diff == 0) Seq((xTime, xValue))
else (xTime, xValue) +: (1 until diff.toInt)
.map(_.toLong)
.map(i => (i + xTime, xValue))
}

case _ => Seq.empty[(Long, Double)]
}

现在剩下的就是在排序的 RDD 上滑动:

import org.apache.spark.mllib.rdd.RDDFunctions._

rdd.sortBy(_._1).sliding(2).flatMap(fillTimePoints).collect

// Array[(Long, Double)] = Array((1,4.0), (2,4.0), (3,1.0),
// (4,1.0), (5,3.6), (6,3.6), (7,0.2))

注意事项:

  • 滑动 是开发者 API 的一部分。其类中的大多数方法在最近的版本中已被弃用。仍然可以从头开始编写代码,但现在它应该可以工作,

  • 您可能更喜欢使用 RangePartitioner 后跟 repartitionAndSortWithinPartitions 而不是排序。然后,您可以使用 mapPartitionspreservePartitioning 设置为 true 来应用局部滑动,最后填补空白(再次 preservePartitioning)。它需要更多工作,但您可以使用 RangePartitioner 对输出进行分区,这很有用。

关于scala - Spark RDD : filling inregular time series,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33294182/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com