gpt4 book ai didi

scala - 为什么这个 LR 代码在 spark 上运行太慢?

转载 作者:可可西里 更新时间:2023-11-01 14:22:06 37 4
gpt4 key购买 nike

因为 MLlib 不支持稀疏输入。所以我在 spark 集群上运行支持稀疏输入格式的流动代码。设置是:

  1. 5个节点,每个节点有8个核心(每个节点上的所有cpu都是100%,98% 用于用户模型,运行代码时)。
  2. 输入:HDFS 上的 10,000,000+ 个实例和 600,000+ 个维度

代码是:

import java.util.Random
import scala.collection.mutable.HashMap
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Vector
import java.lang.Math
import org.apache.spark.broadcast.Broadcast

object SparseLR {
val lableNum = 1
val dimNum = 632918
val iteration = 10
val alpha = 0.1
val lambda = 0.1
val rand = new Random(42)
var w = Vector(dimNum, _=> rand.nextDouble)

class SparserVector {
var elements = new HashMap[Int, Double]

def insert(index: Int, value: Double){
elements += index -> value;
}


def *(scale: Double): Vector = {
var x = new Array[Double](dimNum)
elements.keySet.foreach(k => x(k) = scale * elements.get(k).get)
Vector(x)
}
}
case class DataPoint(x: SparserVector, y: Int)

def parsePoint(line: String): DataPoint = {
var features = new SparserVector
val fields = line.split("\t")
//println("fields:" + fields(0))
val y = fields(0).toInt
fields.filter(_.contains(":")).foreach( f => {
val feature = f.split(":")
features.insert(feature(0).toInt, feature(1).toDouble)
})
return DataPoint(features, y)
}

def gradient(p: DataPoint, w: Broadcast[Vector]) : Vector = {
def h(w: Broadcast[Vector], x: SparserVector): Double = {
val wb = w.value
val features = x.elements
val s = features.keySet.map(k => features.get(k).get * wb(k)).reduce(_ + _)
1 / (1 + Math.exp(-p.y * s))
}
p.x * (-(1 - p.y *h(w, p.x)))
}

def train(sc: SparkContext, dataPoints: RDD[DataPoint]) {
//val sampleNum = dataPoints.count
val sampleNum = 11680250

for(i <- 0 until iteration) {
val wb = sc.broadcast(w)
val g = (dataPoints.map(p => gradient(p, wb)).reduce(_ + _) + lambda * wb.value) /sampleNum
w -= alpha * g

println("iteration " + i + ": g = " + g)
}
}

def main(args : Array[String]): Unit = {
System.setProperty("spark.executor.memory", "15g")
System.setProperty("spark.default.parallelism", "32");
val sc = new SparkContext("spark://xxx:12036", "LR", "/xxx/spark", List("xxx_2.9.3-1.0.jar"))
val lines = sc.textFile("hdfs:xxx/xxx.txt", 32)

val trainset = lines.map(parsePoint _).cache()

train(sc, trainset)
}
}

谁能帮帮我?谢谢!

最佳答案

这个真的很难给你答案。也许这会更好地匹配 code review计算器子站点?

一些显而易见的事情:

您的渐变函数似乎效率低下。当您想为映射的每个键/值对做某事时,这样做效率更高

for((k,v)<-map) { 
...
}

比做

for(k<-map.keySet) { val value = map.get(k).get; 
...
}

此外,对于像这样的性能关键代码,最好将 reduce 更改为累积可变值。所以重写的梯度函数将是

def gradient(p: DataPoint, w: Broadcast[Vector]) : Vector = {
def h(w: Broadcast[Vector], x: SparserVector): Double = {
val wb = w.value
val features = x.elements
var s = 0.0
for((k,v)<-features)
s += v * wb(k)
1 / (1 + Math.exp(-p.y * s))
}
p.x * (-(1 - p.y *h(w, p.x)))
}

现在,如果您想进一步提高性能,则必须更改 SparseVector 以使用索引数组和值数组而不是 Map[Int, Double]。这样做的原因是在 Map 中,键和值将被装箱为具有相当大开销的对象,而 Array[Int] 或 Array[Double] 只是一个紧凑的内存块

(为方便起见,建议定义一个使用 SortedMap[Int, Double] 的构建器,并在构建完成后转换为两个数组)

class SparseVector(val indices: Array[Int], val values: Array[Double]) {
require(indices.length == values.length)

def *(scale: Double): Vector = {
var x = new Array[Double](dimNum)
var i = 0
while(i < indices.length) {
x(indices(i)) = scale * values(i)
i += 1
}
Vector(x)
}
}

请注意,上面的代码示例未经测试,但我想您会明白的。

关于scala - 为什么这个 LR 代码在 spark 上运行太慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20768788/

37 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com