gpt4 book ai didi

performance - Spark 性能 - 如何并行化大循环?

转载 作者:行者123 更新时间:2023-12-02 04:15:21 27 4
gpt4 key购买 nike

我有一个 Spark 应用程序,总共包含 8000 个循环,它运行在 5 个节点的集群上。每个节点有125GB内存和32个核心。相关代码如下所示:

for (m <- 0 until deviceArray.size) { // there are 1000 device 
var id = deviceArray(m)

for (t <- 1 to timePatterns) { // there are 8 time patterns
var hrpvData = get24HoursPVF(dataDF, id, t).cache()

var hrpvDataZI = hrpvData.zipWithIndex

var clustersLSD = runKMeans(hrpvData, numClusters, numIterations)

var clusterPVPred = hrpvData.map(x => clustersLSD.predict(x))
var clusterPVMap = hrpvDataZI.zip(clusterPVPred)

var pvhgmRDD = clusterPVMap.map{r => (r._2, r._1._2)}.groupByKey

var arrHGinfo = pvhgmRDD.collect

// Post process data
// .....

hrpvData.unpersist()
}
}

函数调用get24HoursPVF()为k-means准备特征向量,大约需要40秒。每个循环大约需要 50 秒才能完成集群的使用。我的数据大小为 2 到 3 GB(从表中读取)。给定 8000 个循环,运行此 Spark 应用程序的总时间是 Not Acceptable (8000x50s)。

由于每个设备都是独立的,有没有办法并行化8000次迭代?或者说如何利用集群来解决总运行时间过长的问题? Scala Future 不起作用,因为它只是几乎同时提交作业,但 Spark 不会同时运行这些作业。

最佳答案

除了 for 循环之外,您的代码中还有 Spark 中最慢的 2 个 API 调用 - groupByKeycollect

几乎不应该使用groupByKey,而是查看reduceByKey,请参阅此Databricks blog了解更多详情。

collect 将该 RDD 中的所有数据传输到驱动程序节点上的数组,除非数据量很小,否则会对性能产生相当大的影响。

关于 for 循环,我不是特别熟悉你想要做什么,但是在

var hrpvData = get24HoursPVF(dataDF, id, t).cache()

您正在为每个 id 和 t 值构建并缓存一个新的数据帧。我不知道为什么你不能在一开始就构建一个包含 id 和 t 的每个变体的数据帧,然后在整个数据帧上运行 zipWithIndex、map 等?

关于performance - Spark 性能 - 如何并行化大循环?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34150007/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com