gpt4 book ai didi

scala - 累加器在集群上失败,在本地工作

转载 作者:行者123 更新时间:2023-12-04 02:26:31 27 4
gpt4 key购买 nike

在官方 spark 文档中,有一个用于 foreach 中的累加器示例。直接在 RDD 上调用:

scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

我实现了自己的累加器:
val myCounter = sc.accumulator(0)

val myRDD = sc.textFile(inputpath) // :spark.RDD[String]

myRDD.flatMap(line => foo(line)) // line 69

def foo(line: String) = {
myCounter += 1 // line 82 throwing NullPointerException
// compute something on the input
}
println(myCounter.value)

在本地环境中,这很好用。但是,如果我在具有多台机器的 Spark 独立集群上运行此作业,则工作人员会抛出一个
13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247
java.lang.NullPointerException
at MyClass$.foo(MyClass.scala:82)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.scheduler.ResultTask.run(ResultTask.scala:77)
at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)

在增加累加器的行 myCounter .

我的问题是:累加器只能用于直接应用于 RDD 而不是嵌套函数的“顶级”匿名函数吗?
如果是,为什么我的调用在本地成功而在集群上失败?

编辑 : 增加了异常的详细程度。

最佳答案

就我而言,当我使用“扩展应用程序”创建一个 spark 应用程序时,accumulator 在闭包中为 null,如下所示

    object AccTest extends App {


val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

println("count:" + accum.value)

sc.stop
}
}

我用 main() 方法替换了 extends App,它在 HDP 2.4 的 YARN 集群中工作
object AccTest {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

println("count:" + accum.value)

sc.stop
}
}

工作过

关于scala - 累加器在集群上失败,在本地工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17794664/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com