gpt4 book ai didi

scala - Spark 在 HadoopRDD 上耗时过长 : Input split

转载 作者:行者123 更新时间:2023-12-04 17:45:33 28 4
gpt4 key购买 nike

我在大型 libsvm 文件上使用 SGD 运行逻辑回归。该文件大小约为 10 GB,包含 4000 万个训练示例。

当我用 spark-submit 运行我的 scala 代码时,我注意到 spark 花了很多时间记录这个:

18/02/07 04:44:50 INFO HadoopRDD: Input split: file:/ebs2/preprocess/xaa:234881024+33554432

18/02/07 04:44:51 INFO Executor: Finished task 6.0 in stage 1.0 (TID 7). 875 bytes result sent to driver

18/02/07 04:44:51 INFO TaskSetManager: Starting task 8.0 in stage 1.0 (TID 9, localhost, executor driver, partition 8, PROCESS_LOCAL, 7872 bytes)

18/02/07 04:44:51 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 7) in 1025 ms on localhost (executor driver) (7/307)

为什么 Spark 会做这么多“HadoopRDD:输入拆分”?这样做的目的是什么?我该如何加快或摆脱这个过程?

代码如下:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.MulticlassMetrics

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.optimization.L1Updater
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import scala.compat.Platform._


object test {

def main(args: Array[String]) {

val nnodes = 1
val epochs = 3

val conf = new SparkConf().setAppName("Test Name")
val sc = new SparkContext(conf)

val t0=currentTime
val train = MLUtils.loadLibSVMFile(sc, "/ebs2/preprocess/xaa", 262165, 4)
val test = MLUtils.loadLibSVMFile(sc, "/ebs2/preprocess/xab", 262165, 4)
val t1=currentTime;

println("START")
val lrAlg = new LogisticRegressionWithSGD()
lrAlg.optimizer.setMiniBatchFraction(10.0/40000000.0)
lrAlg.optimizer.setNumIterations(12000000)
lrAlg.optimizer.setStepSize(0.01)

val model = lrAlg.run(train)


model.clearThreshold()
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}

val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)
}
}

最佳答案

我通过运行解决了速度问题

train = train.coalesce(1)
train.cache()

并将内存增加到 64 GB。以前,由于 RAM 不足,Spark 可能无法正确缓存。

关于scala - Spark 在 HadoopRDD 上耗时过长 : Input split,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48656342/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com