gpt4 book ai didi

scala - 使用 Spark collectionAccumulator 时出现 ConcurrentModificationException

转载 作者:行者123 更新时间:2023-12-01 23:35:08 26 4
gpt4 key购买 nike

我尝试在 Azure HDInsight 按需群集上运行基于 Spark 的应用程序,并且看到记录了大量 SparkException(由 ConcurrentModificationException 引起)。当我启动本地 Spark 实例时,应用程序运行时没有出现这些错误。

我看过similar errors when using accumulators的报告我的代码确实使用了 CollectionAccumulator,但是我在使用它的任何地方都放置了同步块(synchronized block),这没有什么区别。累加器相关代码如下所示:

class MySparkClass(sc : SparkContext) {
val myAccumulator = sc.collectionAccumulator[MyRecord]

override def add(record: MyRecord) = {
synchronized {
myAccumulator.add(record)
}
}

override def endOfBatch() = {
synchronized {
myAccumulator.value.asScala.foreach((record: MyRecord) => {
processIt(record)
})
}
}
}

异常不会导致应用程序失败,但是当调用 endOfBatch 且代码尝试从累加器中读取值时,累加器为空,并且 processIt 为从来没有打电话过。

我们正在使用HDInsight version 3.6 Spark 版本 2.3.0

18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:770)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
... 13 more

以下代码是一个更独立的示例,可重现该问题。 MyRecord 是一个仅包含数值的简单案例类。该代码在本地运行时没有错误,但在 HDInsight 群集上会产生上述错误。

object MainDemo {
def main(args: Array[String]) {
val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
val myAccumulator = sparkContext.collectionAccumulator[MyRecord]

sparkContext.binaryFiles("/my/files/here").foreach(_ => {
for(i <- 1 to 100000) {
val record = MyRecord(i, 0, 0)
myAccumulator.add(record)
}
})

myAccumulator.value.asScala.foreach((record: MyRecord) => {
// we expect this to be called once for each record that we 'add' above,
// but it is never called
println(record)
})
}
}

最佳答案

我怀疑同步块(synchronized block)是否真的有帮助。 CustomeAccumulators 或所有其他累加器都不是线程安全的。它们实际上不必这样做,因为 Spark 驱动程序在任务完成(成功或失败)后用于更新累加器值的 DAGScheduler.updateAccumulators 方法仅在运行调度循环的单个线程上执行。除此之外,它们是具有自己的本地累加器引用的工作人员的只写数据结构,而仅驱动程序允许访问累加器的值。当你说它在本地模式下工作时,因为它是单个 JVM,但在集群模式下,它们是不同的 JVM 和 java 实例,正在触发 PRC 调用以启用通信。

您的 MyRecord 对象看起来如何,如果您只是以 .value 结束行而不是在其上使用迭代器,将会有所帮助。尝试一下。

myAccumulator.value

关于scala - 使用 Spark collectionAccumulator 时出现 ConcurrentModificationException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53484774/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com