gpt4 book ai didi

apache-spark - Spark : graphx api OOM errors after unpersist useless RDDs

转载 作者:行者123 更新时间:2023-12-04 12:28:56 27 4
gpt4 key购买 nike

我遇到了一个未知原因的Out Of Memeory错误,我立即释放了无用的RDD,但经过几轮循环后,仍然出现OOM错误。我的代码如下:

// single source shortest path
def sssp[VD](graph:Graph[VD,Double], source: VertexId): Graph[Double, Double] = {
graph.mapVertices((id, _) => if (id == source) 0.0 else Double.PositiveInfinity)
.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => scala.math.min(dist, newDist),
triplet => {
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
}
else {
Iterator.empty
}
},
(a, b) => math.min(a, b)
)
}

def selectCandidate(candidates: RDD[(VertexId, (Double, Double))]): VertexId = {
Random.setSeed(System.nanoTime())
val selectLow = Random.nextBoolean()
val (vid, (_, _)) = if (selectLow) {
println("Select lowest bound")
candidates.reduce((x, y) => if (x._2._1 < y._2._1) x else y)
} else {
println("Select highest bound")
candidates.reduce((x, y) => if (x._2._2 > y._2._2) x else y)
}
vid
}

val g = {/* load graph from hdfs*/}.partitionBy(EdgePartition2D,eParts).cache
println("Vertices Size: " + g.vertices.count )
println("Edges Size: " + g.edges.count )

val resultDiameter = {

val diff = 0d
val maxIterations = 100
val filterJoin = 1e5
val vParts = 100

var deltaHigh = Double.PositiveInfinity
var deltaLow = Double.NegativeInfinity

var candidates = g.vertices.map(x => (x._1, (Double.NegativeInfinity,
Double.PositiveInfinity)))
.partitionBy(new HashPartitioner(vParts))
.persist(StorageLevel.MEMORY_AND_DISK) // (vid, low, high)

var round = 0
var candidateCount = candidates.count
while (deltaHigh - deltaLow > diff && candidateCount > 0 && round <= maxIterations) {

val currentVertex = dia.selectCandidate(candidates)

val dist: RDD[(VertexId, Double)] = dia.sssp(g, currentVertex)
.vertices
.partitionBy(new HashPartitioner(vParts)) // join more efficiently
.persist(StorageLevel.MEMORY_AND_DISK)
val eccentricity = dist.map({ case (vid, length) => length }).max
println("Eccentricity = %.1f".format(eccentricity))

val subDist = if(candidateCount > filterJoin) {
println("Directly use Dist")
dist
} else { // when candidates is small than filterJoin, filter the useless vertices
println("Filter Dist")
val candidatesMap = candidates.sparkContext.broadcast(
candidates.collect.toMap)
val subDist = dist.filter({case (vid, length) =>
candidatesMap.value.contains(vid)})
.persist(StorageLevel.MEMORY_AND_DISK)
println("Sub Dist Count: " + subDist.count)
subDist
}

var previousCandidates = candidates
candidates = candidates.join(subDist).map({ case (vid, ((low, high), d)) =>
(vid,
(Array(low, eccentricity - d, d).max,
Array(high, eccentricity + d).min))
}).persist(StorageLevel.MEMORY_AND_DISK)
candidateCount = candidates.count
println("Candidates Count 1 : " + candidateCount)
previousCandidates.unpersist(true) // release useless rdd
dist.unpersist(true) // release useless rdd

deltaLow = Array(deltaLow,
candidates.map({ case (_, (low, _)) => low }).max).max
deltaHigh = Array(deltaHigh, 2 * eccentricity,
candidates.map({ case (_, (_, high)) => high }).max).min

previousCandidates = candidates
candidates = candidates.filter({ case (_, (low, high)) =>
!((high <= deltaLow && low >= deltaHigh / 2d) || low == high)
})
.partitionBy(new HashPartitioner(vParts)) // join more efficiently
.persist(StorageLevel.MEMORY_AND_DISK)
candidateCount = candidates.count
println("Candidates Count 2:" + candidateCount)
previousCandidates.unpersist(true) // release useless rdd

round += 1
println(s"Round=${round},Low=${deltaLow}, High=${deltaHigh}, Candidates=${candidateCount}")
}

deltaLow
}

println(s"Diameter $resultDiameter")
println("Complete!")

while 块中的主要数据是图形对象 g 和 RDD 候选对象 g 用于计算每一轮的单源最短路径,图结构不变。 候选 大小将逐轮减小。

在每一轮中,我都会手动将无用的rdd以阻塞模式解持久化,因此我认为它应该有足够的内存用于后续操作。但是,它会在第 7 轮或第 6 轮随机停止 OOM。当节目进入第6、7轮时,候选人下降严重,大约是原籍候选人的10%或更少。输出样本如下,候选大小从第 1 轮的 15,288,624 减少到第 7 轮的 67,451:
Vertices Size: 15,288,624
Edges Size: 228,097,574
Select lowest bound
Eccentricity = 12.0
Directly use Dist
Candidates Count 1 : 15288624
Candidates Count 2:15288623
Round=1,Low=12.0, High=24.0, Candidates=15288623
Select lowest bound
Eccentricity = 13.0
Directly use Dist
Candidates Count 1 : 15288623
Candidates Count 2:15288622
Round=2,Low=13.0, High=24.0, Candidates=15288622
Select highest bound
Eccentricity = 18.0
Directly use Dist
Candidates Count 1 : 15288622
Candidates Count 2:6578370
Round=3,Low=18.0, High=23.0, Candidates=6578370
Select lowest bound
Eccentricity = 12.0
Directly use Dist
Candidates Count 1 : 6578370
Candidates Count 2:6504563
Round=4,Low=18.0, High=23.0, Candidates=6504563
Select lowest bound
Eccentricity = 11.0
Directly use Dist
Candidates Count 1 : 6504563
Candidates Count 2:412789
Round=5,Low=18.0, High=22.0, Candidates=412789
Select highest bound
Eccentricity = 17.0
Directly use Dist
Candidates Count 1 : 412789
Candidates Count 2:288670
Round=6,Low=18.0, High=22.0, Candidates=288670
Select highest bound
Eccentricity = 18.0
Directly use Dist
Candidates Count 1 : 288670
Candidates Count 2:67451
Round=7,Low=18.0, High=22.0, Candidates=67451

spark.info 日志的近端
6/12/12 14:03:09 WARN YarnAllocator: Expected to find pending requests, but found none.
16/12/12 14:06:21 INFO YarnAllocator: Canceling requests for 0 executor containers
16/12/12 14:06:33 WARN YarnAllocator: Expected to find pending requests, but found none.
16/12/12 14:14:26 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
16/12/12 14:18:14 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
at io.netty.util.internal.MpscLinkedQueue.offer(MpscLinkedQueue.java:123)
at io.netty.util.internal.MpscLinkedQueue.add(MpscLinkedQueue.java:218)
at io.netty.util.concurrent.SingleThreadEventExecutor.fetchFromScheduledTaskQueue(SingleThreadEventExecutor.java:260)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:347)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:744)
16/12/12 14:18:14 WARN DFSClient: DFSOutputStream ResponseProcessor exception for block BP-552217672-100.76.16.204-1470826698239:blk_1377987137_304302272
java.io.EOFException: Premature EOF: no length prefix available
at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
16/12/12 14:14:39 WARN AbstractConnector:
java.lang.OutOfMemoryError: Java heap space
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:233)
at org.spark-project.jetty.server.nio.SelectChannelConnector.accept(SelectChannelConnector.java:109)
at org.spark-project.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.java:938)
at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:744)
16/12/12 14:20:06 INFO ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Exception was thrown 1 time(s) from Reporter thread.)
16/12/12 14:19:38 WARN DFSClient: Error Recovery for block BP-552217672-100.76.16.204-1470826698239:blk_1377987137_304302272 in pipeline 100.76.15.28:9003, 100.76.48.218:9003, 100.76.48.199:9003: bad datanode 100.76.15.28:9003
16/12/12 14:18:58 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/12/12 14:20:49 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-198] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
16/12/12 14:20:49 INFO SparkContext: Invoking stop() from shutdown hook
16/12/12 14:20:49 INFO ContextCleaner: Cleaned shuffle 446
16/12/12 14:20:49 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveRdd(2567)] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280)
at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270)
at scala.concurrent.Future$class.recover(Future.scala:324)
at scala.concurrent.impl.Promise$DefaultPromise.recover(Promise.scala:153)
at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:376)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:100)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:104)
at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1630)
at org.apache.spark.ContextCleaner.doCleanupRDD(ContextCleaner.scala:208)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:185)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:364)
... 12 more
16/12/12 14:20:49 WARN QueuedThreadPool: 5 threads could not be stopped
16/12/12 14:20:49 INFO SparkUI: Stopped Spark web UI at http://10.215.154.152:56338
16/12/12 14:20:49 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/12/12 14:20:49 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/12/12 14:21:04 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveRdd(2567)] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)

gc.log 的近端
2016-12-12T14:10:43.541+0800: 16832.953: [Full GC 2971008K->2971007K(2971008K), 11.4284920 secs]
2016-12-12T14:10:54.990+0800: 16844.403: [Full GC 2971007K->2971007K(2971008K), 11.4479110 secs]
2016-12-12T14:11:06.457+0800: 16855.870: [GC 2971007K(2971008K), 0.6827710 secs]
2016-12-12T14:11:08.825+0800: 16858.237: [Full GC 2971007K->2971007K(2971008K), 11.5480350 secs]
2016-12-12T14:11:20.384+0800: 16869.796: [Full GC 2971007K->2971007K(2971008K), 11.0481490 secs]
2016-12-12T14:11:31.442+0800: 16880.855: [Full GC 2971007K->2971007K(2971008K), 11.0184790 secs]
2016-12-12T14:11:42.472+0800: 16891.884: [Full GC 2971008K->2971008K(2971008K), 11.3124900 secs]
2016-12-12T14:11:53.795+0800: 16903.207: [Full GC 2971008K->2971008K(2971008K), 10.9517160 secs]
2016-12-12T14:12:04.760+0800: 16914.172: [Full GC 2971008K->2971007K(2971008K), 11.0969500 secs]
2016-12-12T14:12:15.868+0800: 16925.281: [Full GC 2971008K->2971008K(2971008K), 11.1244090 secs]
2016-12-12T14:12:27.003+0800: 16936.416: [Full GC 2971008K->2971008K(2971008K), 11.0206800 secs]
2016-12-12T14:12:38.035+0800: 16947.448: [Full GC 2971008K->2971008K(2971008K), 11.0024270 secs]
2016-12-12T14:12:49.048+0800: 16958.461: [Full GC 2971008K->2971008K(2971008K), 10.9831440 secs]
2016-12-12T14:13:00.042+0800: 16969.454: [GC 2971008K(2971008K), 0.7338780 secs]
2016-12-12T14:13:02.496+0800: 16971.908: [Full GC 2971008K->2971007K(2971008K), 11.1536860 secs]
2016-12-12T14:13:13.661+0800: 16983.074: [Full GC 2971007K->2971007K(2971008K), 10.9956150 secs]
2016-12-12T14:13:24.667+0800: 16994.080: [Full GC 2971007K->2971007K(2971008K), 11.0139660 secs]
2016-12-12T14:13:35.691+0800: 17005.104: [GC 2971007K(2971008K), 0.6693770 secs]
2016-12-12T14:13:38.115+0800: 17007.527: [Full GC 2971007K->2971006K(2971008K), 11.0514040 secs]
2016-12-12T14:13:49.178+0800: 17018.590: [Full GC 2971007K->2971007K(2971008K), 10.8881160 secs]
2016-12-12T14:14:00.076+0800: 17029.489: [GC 2971007K(2971008K), 0.7046370 secs]
2016-12-12T14:14:02.498+0800: 17031.910: [Full GC 2971007K->2971007K(2971008K), 11.3424300 secs]
2016-12-12T14:14:13.862+0800: 17043.274: [Full GC 2971008K->2971006K(2971008K), 11.6215890 secs]
2016-12-12T14:14:25.503+0800: 17054.915: [GC 2971006K(2971008K), 0.7196840 secs]
2016-12-12T14:14:27.857+0800: 17057.270: [Full GC 2971008K->2971007K(2971008K), 11.3879990 secs]
2016-12-12T14:14:39.266+0800: 17068.678: [Full GC 2971007K->2971007K(2971008K), 11.1611420 secs]
2016-12-12T14:14:50.446+0800: 17079.859: [GC 2971007K(2971008K), 0.6976180 secs]
2016-12-12T14:14:52.782+0800: 17082.195: [Full GC 2971007K->2971007K(2971008K), 11.4318900 secs]
2016-12-12T14:15:04.235+0800: 17093.648: [Full GC 2971007K->2971007K(2971008K), 11.3429010 secs]
2016-12-12T14:15:15.598+0800: 17105.010: [GC 2971007K(2971008K), 0.6832320 secs]
2016-12-12T14:15:17.930+0800: 17107.343: [Full GC 2971008K->2971007K(2971008K), 11.1898520 secs]
2016-12-12T14:15:29.131+0800: 17118.544: [Full GC 2971007K->2971007K(2971008K), 10.9680150 secs]
2016-12-12T14:15:40.110+0800: 17129.522: [GC 2971007K(2971008K), 0.7444890 secs]
2016-12-12T14:15:42.508+0800: 17131.920: [Full GC 2971007K->2971007K(2971008K), 11.3052160 secs]
2016-12-12T14:15:53.824+0800: 17143.237: [Full GC 2971007K->2971007K(2971008K), 10.9484100 secs]
2016-12-12T14:16:04.783+0800: 17154.196: [Full GC 2971007K->2971007K(2971008K), 10.9543950 secs]
2016-12-12T14:16:15.748+0800: 17165.160: [GC 2971007K(2971008K), 0.7066150 secs]
2016-12-12T14:16:18.176+0800: 17167.588: [Full GC 2971007K->2971007K(2971008K), 11.1201370 secs]
2016-12-12T14:16:29.307+0800: 17178.719: [Full GC 2971007K->2971007K(2971008K), 11.0746950 secs]
2016-12-12T14:16:40.392+0800: 17189.805: [Full GC 2971007K->2971007K(2971008K), 11.0036170 secs]
2016-12-12T14:16:51.407+0800: 17200.819: [Full GC 2971007K->2971007K(2971008K), 10.9655670 secs]
2016-12-12T14:17:02.383+0800: 17211.796: [Full GC 2971007K->2971007K(2971008K), 10.7348560 secs]
2016-12-12T14:17:13.128+0800: 17222.540: [GC 2971007K(2971008K), 0.6679470 secs]
2016-12-12T14:17:15.450+0800: 17224.862: [Full GC 2971007K->2971007K(2971008K), 10.6219270 secs]
2016-12-12T14:17:26.081+0800: 17235.494: [Full GC 2971007K->2971007K(2971008K), 10.9158450 secs]
2016-12-12T14:17:37.016+0800: 17246.428: [Full GC 2971007K->2971007K(2971008K), 11.3107490 secs]
2016-12-12T14:17:48.337+0800: 17257.750: [Full GC 2971007K->2971007K(2971008K), 11.0769460 secs]
2016-12-12T14:17:59.424+0800: 17268.836: [GC 2971007K(2971008K), 0.6707600 secs]
2016-12-12T14:18:01.850+0800: 17271.262: [Full GC 2971007K->2970782K(2971008K), 12.6348300 secs]
2016-12-12T14:18:14.496+0800: 17283.909: [GC 2970941K(2971008K), 0.7525790 secs]
2016-12-12T14:18:16.890+0800: 17286.303: [Full GC 2971006K->2970786K(2971008K), 13.1047470 secs]
2016-12-12T14:18:30.008+0800: 17299.421: [GC 2970836K(2971008K), 0.8139710 secs]
2016-12-12T14:18:32.458+0800: 17301.870: [Full GC 2971005K->2970873K(2971008K), 13.0410540 secs]
2016-12-12T14:18:45.512+0800: 17314.925: [Full GC 2971007K->2970893K(2971008K), 12.7169690 secs]
2016-12-12T14:18:58.239+0800: 17327.652: [GC 2970910K(2971008K), 0.7314350 secs]
2016-12-12T14:19:00.557+0800: 17329.969: [Full GC 2971008K->2970883K(2971008K), 11.1889000 secs]
2016-12-12T14:19:11.767+0800: 17341.180: [Full GC 2971006K->2970940K(2971008K), 11.4069700 secs]
2016-12-12T14:19:23.185+0800: 17352.597: [GC 2970950K(2971008K), 0.6689360 secs]
2016-12-12T14:19:25.484+0800: 17354.896: [Full GC 2971007K->2970913K(2971008K), 12.6980050 secs]
2016-12-12T14:19:38.194+0800: 17367.607: [Full GC 2971004K->2970902K(2971008K), 12.7641130 secs]
2016-12-12T14:19:50.968+0800: 17380.380: [GC 2970921K(2971008K), 0.6966130 secs]
2016-12-12T14:19:53.266+0800: 17382.678: [Full GC 2971007K->2970875K(2971008K), 12.9416660 secs]
2016-12-12T14:20:06.233+0800: 17395.645: [Full GC 2971007K->2970867K(2971008K), 13.2740780 secs]
2016-12-12T14:20:19.527+0800: 17408.939: [GC 2970881K(2971008K), 0.7696770 secs]
2016-12-12T14:20:22.024+0800: 17411.436: [Full GC 2971007K->2970886K(2971008K), 13.8729770 secs]
2016-12-12T14:20:35.919+0800: 17425.331: [Full GC 2971002K->2915146K(2971008K), 12.8270160 secs]
2016-12-12T14:20:48.762+0800: 17438.175: [GC 2915155K(2971008K), 0.6856650 secs]
2016-12-12T14:20:51.271+0800: 17440.684: [Full GC 2971007K->2915307K(2971008K), 12.4895750 secs]
2016-12-12T14:21:03.771+0800: 17453.184: [GC 2915320K(2971008K), 0.6249910 secs]
2016-12-12T14:21:06.377+0800: 17455.789: [Full GC 2971007K->2914274K(2971008K), 12.6835220 secs]
2016-12-12T14:21:19.129+0800: 17468.541: [GC 2917963K(2971008K), 0.6917090 secs]
2016-12-12T14:21:21.526+0800: 17470.938: [Full GC 2971007K->2913949K(2971008K), 13.0442320 secs]
2016-12-12T14:21:36.588+0800: 17486.000: [GC 2936827K(2971008K), 0.7244690 secs]

因此,日志显示可能存在 内存泄漏 ,它可能发生在两个地方:
1) 我的代码或 2) spark graphx api 中的代码

如果它出现在我的代码中,任何人都可以帮我找出原因吗?

最佳答案

我不认为 unpersist() API 导致内存不足。 OutOfMemory是由 collect() 引起的API 因为 collect() (这是一个 Action 转换 不同)将整个 RDD 获取到单个驱动程序机器。

几点建议:

  • 增加驱动程序内存中的 RAM 是您已经实现的一种部分解决方案。如果您正在使用 jdk 8,请使用 G1GC 收集器来管理大堆。
  • 您可以使用存储级别( MEMORY_AND_DISK OFF_HEAP 等)来为您的应用程序微调它。

  • 看看这个官方文档 guide更多细节。

    关于apache-spark - Spark : graphx api OOM errors after unpersist useless RDDs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41094475/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com