gpt4 book ai didi

java - Apache Spark Accumulable addInPlace 需要返回 R1?或者有什么值(value)?

转载 作者:行者123 更新时间:2023-11-30 08:37:46 26 4
gpt4 key购买 nike

From the Spark source code for Accumulable is the addInPlace method合并来自不同分区的相同 Accumulable 的值:

/**
* Merge two accumulated values together. Is allowed to modify and return the first value
* for efficiency (to avoid allocating objects).
*
* @param r1 one set of accumulated data
* @param r2 another set of accumulated data
* @return both data sets merged together
*/
def addInPlace(r1: R, r2: R): R

我假设在我的 AccumulableParam 实现中定义 addInPlace 时,我可以返回任何我想要的值。我假设我作为 r1 传入的任何指针都会指向我返回的任何内容。

我的老板认为传入的 r1 是 return 语句中唯一允许的东西。这听起来很像 Ann-Landers,谁是对的?

有一种情况我只想扔掉 r1 并用 r2 中的对象替换它,这将是这个合并后的累加器的新值。

我可以只返回 r2 还是必须像我的(Java 编程经验多得多)老板认为的那样对 r1 进行深度复制?需要明确的是,虽然 Spark 当然是用 Scala 编写的,我正在编写一个用 Java 实现 AccumulableParam 的类。

最佳答案

根据经验,在执行类似折叠的操作时,您应该永远不要修改第二个参数。我们可以用一个简单的例子来说明为什么。假设我们有这样的简单累加器:

import org.apache.spark.AccumulatorParam
import scala.collection.mutable.{Map => MMap}

type ACC = MMap[String, Int]

object DummyAccumulatorParam extends AccumulatorParam[ACC] {
def zero(initialValue: ACC): ACC = {
initialValue
}

def addInPlace(acc: ACC, v: ACC): ACC = {
v("x") = acc.getOrElse("x", 0) + v.getOrElse("x", 0)
v
}
}

特别有用,但没关系。重点是它修改了第二个参数。让我们看看它是否有效:

val rdd = sc.parallelize(Seq(MMap("x" -> 1), MMap("x" -> 1), MMap("x" -> 1)), 1)

val accum1 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum1 += x)

accum1.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)

到目前为止一切顺利。我们甚至可以创建另一个,它仍然按预期工作:

val accum2 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum2 += x)

accum2.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)

现在让我们缓存数据:

rdd.cache

重复这个过程:

val accum3 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum3 += x)

val accum4 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum4 += x)

并检查累加器值:

accum4.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 6)

和RDD内容:

rdd.collect
// Array[scala.collection.mutable.Map[String,Int]] =
// Array(Map(x -> 1), Map(x -> 3), Map(x -> 6))

因此如您所见,返回或修改第二个参数是不安全的。它也适用于其他类似的操作,例如 foldaggregate

关于java - Apache Spark Accumulable addInPlace 需要返回 R1?或者有什么值(value)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36951417/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com