- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我刚刚开始使用 Spark 0.7.2 和 Scala 2.9.3 进行编程。我正在一台独立机器上测试机器学习算法,该算法的最后一步需要计算两个矩阵之间的 MSE(均方误差),即 || A - M||^2,我们在两个矩阵之间进行元素减法。由于 A 的潜在大小非常大且稀疏,因此我们以(键,值)对的形式存储矩阵,其中键是坐标(i,j),值是 A 的相应元素的元组, M,即(A_ij,M_ij)。整个 ML 算法是梯度下降,因此对于每次迭代,我们计算 MSE 并针对特定阈值对其进行测试。然而,整个程序运行正常,无需计算每次迭代的MSE。该程序如下所示:
val ITERATIONS = 100
for (i <- 1 to ITERATIONS) {
... // calculate M for each iteration
val mse = A.map{ x =>
val A_ij = x._2(0)
val M_ij = x._2(1)
(A_ij - M_ij) * (A_ij - M_ij)
}.reduce(_+_)
...
}
该程序最多只能运行 45 次迭代,并且会崩溃并出现以下 Spark 异常:
[error] (run-main) spark.SparkException: Job failed: ShuffleMapTask(764, 0) failed: ExceptionFailure(java.lang.StackOverflowError)
spark.SparkException: Job failed: ShuffleMapTask(764, 0) failed: ExceptionFailure(java.lang.StackOverflowError)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
at spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:601)
at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:300)
at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
另一个观察结果是,每次迭代,运行时间都会增加约 5%。同样,如果没有“reduce( _ + _ )”,就不会有 StackOverflowError。我尝试增加可能的物理线程总数的并行度,但这没有帮助。
非常感谢任何人都可以指出一些方向,让我找出堆栈溢出错误的根本原因。
编辑:
stackoverflow异常,并且从“at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
”开始重复61次:
13/06/26 00:44:41 ERROR LocalScheduler: Exception in task 0
java.lang.StackOverflowError
at java.lang.Exception.<init>(Exception.java:77)
at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:54)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1849)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1947)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1947)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1753)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1329)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:351)
at scala.collection.immutable.$colon$colon.readObject(List.scala:435)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
主要迭代代码
一些实用函数包含在下一个列表元素中
while (i <= ITERATION && err >= THRESHOLD) {
// AW: group by row, then create key by col
// split A by row
// (col, (A_w_M_element, W_row_vector, (row, col)))
AW = A.map(x =>
(x._1._1, (x._1, x._2))
).cogroup(W).flatMap( x => {
val wt_i = x._2._2(0)
val A_i_by_j = x._2._1
A_i_by_j.map( j => (j._1._2, (j._2, wt_i, j._1)))
})
// calculate the X = Wt*A
X_i_by_j = AW.map( k =>
(k._1, k._2._2.map(_*k._2._1(0)))
).reduceByKey(op_two_arrays(_, _, add))
// Y = Wt*M = Wt*WH at the same time
Y_i_by_j = AW.map( k =>
(k._1, k._2._2.map(_*k._2._1(2)))
).reduceByKey(op_two_arrays(_, _, add))
// X ./ Y
X_divide_Y = X_i_by_j.join(Y_i_by_j).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, divide))
)
// H = H .* X_divide_Y
H = H.join(X_divide_Y).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, multiple))
)
// Update M = WH
// M = matrix_multi_local(AW, H)
A = AW.join(H).map( x => {
val orig_AwM = x._2._1._1
val W_row = x._2._1._2
val cord = x._2._1._3
val H_col = x._2._2
// notice that we include original A here as well
(cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
})
// split M into two intermediate matrix (one by row, and the other by col)
/*val M_by_i = M.map(x =>
(x._1._1, (x._1, x._2))
)
val M_by_j = M.map(x =>
(x._1._2, (x._1, x._2))
)*/
// AH: group by col, then create key by row
// Divide A by row first
// val AH = matrix_join_local(M_by_j, H)
AH = A.map(x =>
(x._1._2, (x._1, x._2))
).cogroup(H).flatMap( x => {
val H_col = x._2._2(0)
val AM_j_by_i = x._2._1
AM_j_by_i.map( i => (i._1._1, (i._2, H_col, i._1)))
})
// calculate V = At*H
V_j_by_i = AH.map( k =>
(k._1, k._2._2.map(_*k._2._1(0)))
).reduceByKey(op_two_arrays(_, _, add))
// calculate U = Mt*H
U_j_by_i = AH.map( k =>
(k._1, k._2._2.map(_*k._2._1(2)))
).reduceByKey(op_two_arrays(_, _, add))
// V / U
V_divide_U = V_j_by_i.join(U_j_by_i).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, divide))
)
// W = W .* V_divide_U
W = W.join(V_divide_U).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, multiple))
)
// M = W*H
A = AH.join(W).map( x => {
val orig_AwM = x._2._1._1
val H_col = x._2._1._2
val cord = x._2._1._3
val W_row = x._2._2
// notice that we include original A here as well
(cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
})
// Calculate the error
// calculate the sequre of difference
err = A.map( x => (x._2(0) - x._2(2))*(x._2(0) - x._2(2))/A_len).reduce(_+_)
println("At round " + i + ": MSE is " + err)
}
使用的一些实用函数:
def op_two_arrays (array1: Array[Double], array2: Array[Double], f: (Double, Double) => Double) : Array[Double] = {
val len1 = array1.length
val len2 = array2.length
if (len1 != len2) {
return null
}
// val new_array : Array[Double] = new Array[Double](len1)
for (i <- 0 to len1 - 1) {
array1(i) = f(array1(i), array2(i))
}
return array1
}
// element-wise operation
def add (a: Double, b: Double): Double = { a + b }
def multiple (a: Double, b: Double): Double = { a * b }
def divide (a: Double, b: Double): Double = {
try {
return a / b
} catch {
case x: ArithmeticException => {
println("ArithmeticException: detect divide by zero")
return Double.NaN
}
}
}
def array_sum (array: Array[Double]) : Double = {
var sum: Double = 0.0
for (i <- array) {
sum += i
}
return sum
}
def dot_product (vec1: Array[Double], vec2: Array[Double]) : Double = {
array_sum(op_two_arrays(vec1, vec2, multiple))
}
最佳答案
我尝试使用spark.util.Vector增加堆栈大小、本地化实用函数,但不幸的是他们都没有解决这个问题。然后我尝试将 Spark 从 0.7.2 降级到 0.6.3 ( https://github.com/mesos/spark/tree/branch-0.6 )。即使对于 10,000 x 10,000 的矩阵,它也能工作,并且不会再出现 Stack Overflow。我不知道它到底是如何修复它的,所以我发布了 RDD.scala 中 reduce
函数之间的区别:
--- spark-0.6.3/core/src/main/scala/spark/RDD.scala 2013-06-27 11:31:12.628017194 -0700
+++ spark-0.7.2/core/src/main/scala/spark/RDD.scala 2013-06-27 13:42:22.844686240 -0700
@@ -316,39 +468,93 @@
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
+ // println("RDD.reduce: after sc.clean")
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
- }else {
+ } else {
None
}
}
- val options = sc.runJob(this, reducePartition)
- val results = new ArrayBuffer[T]
- for (opt <- options; elem <- opt) {
- results += elem
- }
- if (results.size == 0) {
- throw new UnsupportedOperationException("empty collection")
- } else {
- return results.reduceLeft(cleanF)
+ // println("RDD.reduce: after reducePartition")
+ var jobResult: Option[T] = None
+ val mergeResult = (index: Int, taskResult: Option[T]) => {
+ if (taskResult != None) {
+ jobResult = jobResult match {
+ case Some(value) => Some(f(value, taskResult.get))
+ case None => taskResult
+ }
+ }
}
+ // println("RDD.reduce: after jobResult")
+ sc.runJob(this, reducePartition, mergeResult)
+ // println("RDD.reduce: after sc.runJob")
+ // Get the final result out of our Option, or throw an exception if the RDD was empty
+ jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+ // println("RDD.reduce: finished")
}
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+ * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
*/
def fold(zeroValue: T)(op: (T, T) => T): T = {
+ // Clone the zero value since we will also be serializing it as part of tasks
+ var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
- val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp))
- return results.fold(zeroValue)(cleanOp)
+ val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
+ val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
+ sc.runJob(this, foldPartition, mergeResult)
+ jobResult
}
关于java - Spark : reduce causes StackOverflowError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17311733/
我不知道如何从 reducerRegister.js 中的 reducerForm.js reducer 访问 bool 值 isLoading 标志。我使用了 combineReducers() 并
我正在尝试找到一种理想的方法来更新我的状态树上的几个顶级字段,同时仍然维护拆分 reducer 。 这是我想出的一个简单的解决方案。 var state = { fileOrder: [0],
如果我们想按相同的键(第一个 reducer 的输出)分组,是否可以将 reducer 的输出直接发送到另一个 reducer 有时在链接时我发现我正在使用一个映射器来读取输入并将其复制到输出。因此想
我有一个如下所示的 reducer : const chart = combineReducers({ data, fetchProgress, fetchError,
当Map Reduce代码中有多个reduce时,它们之间没有任何形式的通信。但是,当执行诸如聚合之类的操作时,所有化简器共同产生单个最终输出。当它们之间没有通信时,聚合如何发生?是通过写入上下文吗?
我在 hive 中有一个表,我想从中获取所有数据。问题是: select * from tbl; 给我的结果与以下情况截然不同: select count(*) from tbl; 这是为什么?第二个
假设我有一个带有两个 reducer 的应用程序 - 使用 combineReducers() 组合的 tables 和 footer。 当我点击某个按钮时,将分派(dispatch)两个操作 - 一
我正在学习更深入的 redux,并且在处理高阶 reducer 时遇到一些麻烦。 我试图使用一个简单的分页示例来了解它是如何工作的。 NB:下面的代码只是 Nodejs 上下文中 redux 的一个快
我调用 RSS 提要并使用解析器对其进行解析。我收到一个数组。我现在想在最后创建一个对象,看起来像这样: { "2019-06-13": { "rates": { "usd":
我有一份学生列表,我的应用程序始终显示当时的一个学生,即 activePupil。到目前为止我有两个 reducer 。其中一个包含并默认返回所有子项的列表(以数组的形式): [ { id:
我有一个叫做 animals 的特征缩减器(切片缩减器)。我想将这些 reducer 拆分为哺乳动物、鸟类、鱼类等。这部分很简单,因为我可以简单地使用 ActionReducerMap。 现在假设哺乳
空数组上的简单reduce会抛出: 线程“main”java.lang.UnsupportedOperationException 中的异常:无法减少空的可迭代对象。 链接时同样的异常: val a
我有一些 25k 文档(原始 json 中为 4 GB)的数据,我想对其执行一些 javascript 操作,以使我的最终数据使用者 (R) 更容易访问这些数据,并且我想通过为每个更改添加一个新集合来
我只是想验证我对这些参数及其关系的理解,如果我错了请通知我。 mapreduce.reduce.shuffle.input.buffer.percent 告诉分配给 reducer 的整个洗牌阶段的内
我想将 redux 状态的值从 reducer 传递到另一个 reducer。在我的例子中,我想将 groups 的值从 groupReducer.js 中的状态传递到 scheduleReducer
所以,我有一个应用程序,它有多个 reducer ,因此有多个关联的 Action 创建者。 有一段时间,我的一个 reducer 更新了状态(由于编辑),因此,我必须确保其他 reducer 看到此
我有一个 reducer ,可以在调度操作时重新调整适当的状态。现在我定期调用 API,因此结果会一次又一次地触发操作。所以我想要的是,如果 reducer 状态已经有数据,那么另一个 reducer
当我尝试执行来自 here 的 DISTINCT reduce 时,出现错误。我已经在啤酒 sample 桶上重现了这个错误,所以这应该很容易重现。我没有在 mapreduce_errors.txt
在以下语法的简单优先级解析(分解)中,我们存在 shift-reduce 和 reduce-reduce 冲突。 X 是开始符号,X'-->$X$ 是添加规则。另外+和下符号是终结符。 X'-->$X
我需要编写一个连续调用两个reducer的Mapreduce程序。即,第一个 reducer 的输出将是第二个 reducer 的输入。我如何实现这一目标? 到目前为止我发现的内容表明我需要在我的驱动
我是一名优秀的程序员,十分优秀!