gpt4 book ai didi

scala - Spark.mllib 中的并行性

转载 作者:行者123 更新时间:2023-12-02 00:31:24 25 4
gpt4 key购买 nike

假设我有一个Array[RDD]类型的对象data。我想学习此对象中每个 RDD 上的独立机器学习模型。例如,对于随机森林:

data.map{ d => RandomForest.trainRegressor(d,2,Map[Int,Int](),2,"auto","gini",2,10) }

当我使用 spark-submit --master yarn-client ... 启动此作业时,独立学习任务似乎并未在多个节点上并行化。几乎所有工作仅由一个节点(即此处的节点 10)完成,如应用程序 UI 的屏幕截图所示:

enter image description here

附录

为了完整起见,整个代码如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest


object test {
def main(args: Array[String]) {

val conf = new SparkConf().setMaster("local").setAppName("test")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)

// Load data
val rawData = sc.textFile("data/mllib/sample_tree_data.csv")
val data = rawData.map { line =>
val parts = line.split(',').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.tail))
}

val CV_data = (1 to 100).toArray.map(_ => {val splits = data.randomSplit(Array(0.7, 0.3)) ; splits(0)})

CV_data.map(d => RandomForest.trainClassifier(d, 2, Map[Int, Int](), 2, "sqrt", "gini", 2, 100))

sc.stop()
System.exit(0)
}
}

最佳答案

问题在于,RandomForest.trainClassifier可以被视为一个 Action ,因为它急切地触发了一些涉及的RDD计算的执行。因此,每当您调用 RandomForest.trainClassifier 时,Spark 作业都会提交到集群并执行。

由于 Scala Array 上的 map 操作是按顺序执行的,因此您最终会执行一个又一个的 trainClassifier 作业。为了并行执行作业,您必须在并行集合上调用map。下面的代码片段应该可以解决问题:

CV_data.par.map(d => RandomForest.trainClassifier(d, 2, Map[Int, Int](), 2, "sqrt", "gini", 2, 100))

关于scala - Spark.mllib 中的并行性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34313090/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com