gpt4 book ai didi

java - Spark : reduce causes StackOverflowError

转载 作者:行者123 更新时间:2023-11-30 09:24:01 25 4
gpt4 key购买 nike

我刚刚开始使用 Spark 0.7.2 和 Scala 2.9.3 进行编程。我正在一台独立机器上测试机器学习算法,该算法的最后一步需要计算两个矩阵之间的 MSE(均方误差),即 || A - M||^2,我们在两个矩阵之间进行元素减法。由于 A 的潜在大小非常大且稀疏,因此我们以(键,值)对的形式存储矩阵,其中键是坐标(i,j),值是 A 的相应元素的元组, M,即(A_ij,M_ij)。整个 ML 算法是梯度下降,因此对于每次迭代,我们计算 MSE 并针对特定阈值对其进行测试。然而,整个程序运行正常,无需计算每次迭代的MSE。该程序如下所示:

val ITERATIONS = 100
for (i <- 1 to ITERATIONS) {
... // calculate M for each iteration
val mse = A.map{ x =>
val A_ij = x._2(0)
val M_ij = x._2(1)
(A_ij - M_ij) * (A_ij - M_ij)
}.reduce(_+_)
...
}

该程序最多只能运行 45 次迭代,并且会崩溃并出现以下 Spark 异常:

[error] (run-main) spark.SparkException: Job failed: ShuffleMapTask(764, 0) failed: ExceptionFailure(java.lang.StackOverflowError)
spark.SparkException: Job failed: ShuffleMapTask(764, 0) failed: ExceptionFailure(java.lang.StackOverflowError)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
at spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:601)
at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:300)
at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)

另一个观察结果是,每次迭代,运行时间都会增加约 5%。同样,如果没有“reduce( _ + _ )”,就不会有 StackOverflowError。我尝试增加可能的物理线程总数的并行度,但这没有帮助。

非常感谢任何人都可以指出一些方向,让我找出堆栈溢出错误的根本原因。

编辑:

  1. A 的类型为 Spark.RDD[((Double, Double), Array[Double])]
  2. stackoverflow异常,并且从“at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)”开始重复61次:

    13/06/26 00:44:41 ERROR LocalScheduler: Exception in task 0
    java.lang.StackOverflowError
    at java.lang.Exception.<init>(Exception.java:77)
    at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:54)
    at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1849)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1947)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1947)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:351)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:435)
    at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
  3. 主要迭代代码

一些实用函数包含在下一个列表元素中

while (i <= ITERATION && err >= THRESHOLD) {      
// AW: group by row, then create key by col
// split A by row
// (col, (A_w_M_element, W_row_vector, (row, col)))
AW = A.map(x =>
(x._1._1, (x._1, x._2))
).cogroup(W).flatMap( x => {
val wt_i = x._2._2(0)
val A_i_by_j = x._2._1
A_i_by_j.map( j => (j._1._2, (j._2, wt_i, j._1)))
})

// calculate the X = Wt*A
X_i_by_j = AW.map( k =>
(k._1, k._2._2.map(_*k._2._1(0)))
).reduceByKey(op_two_arrays(_, _, add))

// Y = Wt*M = Wt*WH at the same time
Y_i_by_j = AW.map( k =>
(k._1, k._2._2.map(_*k._2._1(2)))
).reduceByKey(op_two_arrays(_, _, add))

// X ./ Y
X_divide_Y = X_i_by_j.join(Y_i_by_j).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, divide))
)

// H = H .* X_divide_Y
H = H.join(X_divide_Y).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, multiple))
)

// Update M = WH
// M = matrix_multi_local(AW, H)
A = AW.join(H).map( x => {
val orig_AwM = x._2._1._1
val W_row = x._2._1._2
val cord = x._2._1._3
val H_col = x._2._2
// notice that we include original A here as well
(cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
})

// split M into two intermediate matrix (one by row, and the other by col)

/*val M_by_i = M.map(x =>
(x._1._1, (x._1, x._2))
)
val M_by_j = M.map(x =>
(x._1._2, (x._1, x._2))
)*/

// AH: group by col, then create key by row
// Divide A by row first
// val AH = matrix_join_local(M_by_j, H)
AH = A.map(x =>
(x._1._2, (x._1, x._2))
).cogroup(H).flatMap( x => {
val H_col = x._2._2(0)
val AM_j_by_i = x._2._1
AM_j_by_i.map( i => (i._1._1, (i._2, H_col, i._1)))
})

// calculate V = At*H
V_j_by_i = AH.map( k =>
(k._1, k._2._2.map(_*k._2._1(0)))
).reduceByKey(op_two_arrays(_, _, add))

// calculate U = Mt*H
U_j_by_i = AH.map( k =>
(k._1, k._2._2.map(_*k._2._1(2)))
).reduceByKey(op_two_arrays(_, _, add))

// V / U
V_divide_U = V_j_by_i.join(U_j_by_i).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, divide))
)

// W = W .* V_divide_U
W = W.join(V_divide_U).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, multiple))
)
// M = W*H
A = AH.join(W).map( x => {
val orig_AwM = x._2._1._1
val H_col = x._2._1._2
val cord = x._2._1._3
val W_row = x._2._2
// notice that we include original A here as well
(cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
})

// Calculate the error
// calculate the sequre of difference
err = A.map( x => (x._2(0) - x._2(2))*(x._2(0) - x._2(2))/A_len).reduce(_+_)
println("At round " + i + ": MSE is " + err)
}

使用的一些实用函数:

def op_two_arrays (array1: Array[Double], array2: Array[Double], f: (Double, Double) => Double) : Array[Double] = {
val len1 = array1.length
val len2 = array2.length
if (len1 != len2) {
return null
}
// val new_array : Array[Double] = new Array[Double](len1)
for (i <- 0 to len1 - 1) {
array1(i) = f(array1(i), array2(i))
}
return array1
}

// element-wise operation
def add (a: Double, b: Double): Double = { a + b }

def multiple (a: Double, b: Double): Double = { a * b }

def divide (a: Double, b: Double): Double = {
try {
return a / b
} catch {
case x: ArithmeticException => {
println("ArithmeticException: detect divide by zero")
return Double.NaN
}
}
}

def array_sum (array: Array[Double]) : Double = {
var sum: Double = 0.0
for (i <- array) {
sum += i
}
return sum
}

def dot_product (vec1: Array[Double], vec2: Array[Double]) : Double = {
array_sum(op_two_arrays(vec1, vec2, multiple))
}

最佳答案

我尝试使用spark.util.Vector增加堆栈大小、本地化实用函数,但不幸的是他们都没有解决这个问题。然后我尝试将 Spark 从 0.7.2 降级到 0.6.3 ( https://github.com/mesos/spark/tree/branch-0.6 )。即使对于 10,000 x 10,000 的矩阵,它也能工作,并且不会再出现 Stack Overflow。我不知道它到底是如何修复它的,所以我发布了 RDD.scala 中 reduce 函数之间的区别:

--- spark-0.6.3/core/src/main/scala/spark/RDD.scala 2013-06-27 11:31:12.628017194 -0700
+++ spark-0.7.2/core/src/main/scala/spark/RDD.scala 2013-06-27 13:42:22.844686240 -0700
@@ -316,39 +468,93 @@
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
+ // println("RDD.reduce: after sc.clean")
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
- }else {
+ } else {
None
}
}
- val options = sc.runJob(this, reducePartition)
- val results = new ArrayBuffer[T]
- for (opt <- options; elem <- opt) {
- results += elem
- }
- if (results.size == 0) {
- throw new UnsupportedOperationException("empty collection")
- } else {
- return results.reduceLeft(cleanF)
+ // println("RDD.reduce: after reducePartition")
+ var jobResult: Option[T] = None
+ val mergeResult = (index: Int, taskResult: Option[T]) => {
+ if (taskResult != None) {
+ jobResult = jobResult match {
+ case Some(value) => Some(f(value, taskResult.get))
+ case None => taskResult
+ }
+ }
}
+ // println("RDD.reduce: after jobResult")
+ sc.runJob(this, reducePartition, mergeResult)
+ // println("RDD.reduce: after sc.runJob")
+ // Get the final result out of our Option, or throw an exception if the RDD was empty
+ jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+ // println("RDD.reduce: finished")
}

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+ * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
*/
def fold(zeroValue: T)(op: (T, T) => T): T = {
+ // Clone the zero value since we will also be serializing it as part of tasks
+ var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
- val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp))
- return results.fold(zeroValue)(cleanOp)
+ val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
+ val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
+ sc.runJob(this, foldPartition, mergeResult)
+ jobResult
}

关于java - Spark : reduce causes StackOverflowError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17311733/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com