gpt4 book ai didi

apache-spark - 顺序 RDD 处理中的函数式方法 [Apache Spark]

转载 作者:行者123 更新时间:2023-12-02 01:41:33 24 4
gpt4 key购买 nike

我有一个 RDD,连接到一个 HBase 表。每行(键)代表一个 GPS 位置。现在我写了一个函数来计算两点之间的距离。应使用当前行及其前导 [i-1] 调用该函数

现在我正在努力通过 RDD 函数以功能性方式完成这项工作,以便我可以将其并行化。

我的快速而肮脏的方法是首先创建一个数组

val rows = rdd.collect()
val rowCount = rdd.count() - 1 //since the first row has no distance
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int])
var i = 0 //can be better solved in scala, I know ;)

rows.foreach(row => {
if (predecssorPoint == null) {
predecssorPoint = getPointByRow(row._2)
}
else {
currentPoint = getPointByRow(row._2)
rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint)

i += 1
predecssorPoint = currentPoint
}
})

return rowArray

然后我将数组并行化并计算距离

  //create a parallel-enabled data set
val parallelDataSet = sc.parallelize(rows)

parallelDataSet.foreach(row => {
Functions.logDistance(row)
})

这行得通,但很丑陋,而且效率肯定很低。

我知道的想法是使用 rdd.reduce() 摆脱 foreach 循环,如果距离函数处理无法保证 (a+b) 的顺序的问题,这可能会起作用。

请问还有更好的办法吗?我的理解是,在使用 RDD 时不可能进行(有效的)索引访问。

谢谢。

最佳答案

鉴于排序是这里的关键,一个好的方法可能是首先索引 RDD。然后,使用索引,我们可以模拟一个 zip 并将元组分区到集群上。像这样:

val indexed = rdd.zipWithIndex.map(_.swap) // 
val shifted = indexed.map{case (k,v) => (k-1,v)}
val joined = indexed.join(shifted)
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)}

(*) 示例代码 - 未测试

关于apache-spark - 顺序 RDD 处理中的函数式方法 [Apache Spark],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28236347/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com