Spark: graphx api unpersist 无用 RDD 后出现 OOM 错误

Spark: graphx api OOM errors after unpersist useless RDDs

遇到不明原因的Out Of Memeory错误,我立即释放了无用的RDD,但经过几轮循环后,仍然出现OOM错误。我的代码如下:

// single source shortest path
def sssp[VD](graph:Graph[VD,Double], source: VertexId): Graph[Double, Double] = {
    graph.mapVertices((id, _) => if (id == source) 0.0 else Double.PositiveInfinity)
        .pregel(Double.PositiveInfinity)(
            (id, dist, newDist) => scala.math.min(dist, newDist),
            triplet => {
                if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
                    Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
                }
                else {
                    Iterator.empty
                }
            },
            (a, b) => math.min(a, b)
        )
}

def selectCandidate(candidates: RDD[(VertexId, (Double, Double))]): VertexId = {
    Random.setSeed(System.nanoTime())
    val selectLow = Random.nextBoolean()
    val (vid, (_, _)) = if (selectLow) {
        println("Select lowest bound")
        candidates.reduce((x, y) => if (x._2._1 < y._2._1) x else y)
    } else {
        println("Select highest bound")
        candidates.reduce((x, y) => if (x._2._2 > y._2._2) x else y)
    }
    vid
}

val g = {/* load graph from hdfs*/}.partitionBy(EdgePartition2D,eParts).cache
println("Vertices Size: " +  g.vertices.count )
println("Edges Size: " +  g.edges.count )

val resultDiameter = {

    val diff = 0d
    val maxIterations = 100
    val filterJoin = 1e5
    val vParts = 100

    var deltaHigh = Double.PositiveInfinity
    var deltaLow = Double.NegativeInfinity

    var candidates = g.vertices.map(x => (x._1, (Double.NegativeInfinity,
        Double.PositiveInfinity)))
        .partitionBy(new HashPartitioner(vParts))
        .persist(StorageLevel.MEMORY_AND_DISK) // (vid, low, high)

    var round = 0
    var candidateCount = candidates.count
    while (deltaHigh - deltaLow > diff && candidateCount > 0 && round <= maxIterations) {

        val currentVertex = dia.selectCandidate(candidates)

        val dist: RDD[(VertexId, Double)] = dia.sssp(g, currentVertex)
            .vertices
            .partitionBy(new HashPartitioner(vParts)) // join more efficiently
            .persist(StorageLevel.MEMORY_AND_DISK)
        val eccentricity = dist.map({ case (vid, length) => length }).max
        println("Eccentricity = %.1f".format(eccentricity))

        val subDist =  if(candidateCount > filterJoin) {
            println("Directly use Dist")
            dist
        } else {  // when candidates is small than filterJoin, filter the useless vertices
            println("Filter Dist")
            val candidatesMap = candidates.sparkContext.broadcast(
                candidates.collect.toMap)
            val subDist = dist.filter({case (vid, length) =>
                candidatesMap.value.contains(vid)})
                .persist(StorageLevel.MEMORY_AND_DISK)
            println("Sub Dist Count: " + subDist.count)
            subDist
        }

        var previousCandidates = candidates
        candidates = candidates.join(subDist).map({ case (vid, ((low, high), d)) =>
            (vid,
                (Array(low, eccentricity - d, d).max,
                    Array(high, eccentricity + d).min))
        }).persist(StorageLevel.MEMORY_AND_DISK)
        candidateCount = candidates.count
        println("Candidates Count 1 : " + candidateCount)
        previousCandidates.unpersist(true) // release useless rdd
        dist.unpersist(true) // release useless rdd

        deltaLow = Array(deltaLow,
            candidates.map({ case (_, (low, _)) => low }).max).max
        deltaHigh = Array(deltaHigh, 2 * eccentricity,
            candidates.map({ case (_, (_, high)) => high }).max).min

        previousCandidates = candidates
        candidates = candidates.filter({ case (_, (low, high)) =>
            !((high <= deltaLow && low >= deltaHigh / 2d) || low == high)
        })
            .partitionBy(new HashPartitioner(vParts))  // join more efficiently
            .persist(StorageLevel.MEMORY_AND_DISK)
        candidateCount = candidates.count
        println("Candidates Count 2:" + candidateCount)
        previousCandidates.unpersist(true) // release useless rdd

        round += 1
        println(s"Round=${round},Low=${deltaLow}, High=${deltaHigh}, Candidates=${candidateCount}")
    }

    deltaLow
}

println(s"Diameter $resultDiameter")
println("Complete!")

while块中的主要数据是一个图对象g和一个RDDcandidatesg用于计算每轮的单源最短测试路径,图结构不变。 候选人 尺寸将逐轮减小。

在每一轮中,我都以阻塞模式手动取消持久化无用的rdd,所以我认为它应该有足够的内存来进行后续操作。但是,它会随机在第 7 或 6 轮 OOM 停止。当节目进入第六轮或第七轮时,候选人减少严重,大约是原始人的10%或更少。输出样本如下,候选大小从第 1 轮的 15,288,624 减少到第 7 轮的 67,451:

Vertices Size: 15,288,624
Edges Size: 228,097,574
Select lowest bound
Eccentricity = 12.0
Directly use Dist
Candidates Count 1 : 15288624
Candidates Count 2:15288623
Round=1,Low=12.0, High=24.0, Candidates=15288623
Select lowest bound
Eccentricity = 13.0
Directly use Dist
Candidates Count 1 : 15288623
Candidates Count 2:15288622
Round=2,Low=13.0, High=24.0, Candidates=15288622
Select highest bound
Eccentricity = 18.0
Directly use Dist
Candidates Count 1 : 15288622
Candidates Count 2:6578370
Round=3,Low=18.0, High=23.0, Candidates=6578370
Select lowest bound
Eccentricity = 12.0
Directly use Dist
Candidates Count 1 : 6578370
Candidates Count 2:6504563
Round=4,Low=18.0, High=23.0, Candidates=6504563
Select lowest bound
Eccentricity = 11.0
Directly use Dist
Candidates Count 1 : 6504563
Candidates Count 2:412789
Round=5,Low=18.0, High=22.0, Candidates=412789
Select highest bound
Eccentricity = 17.0
Directly use Dist
Candidates Count 1 : 412789
Candidates Count 2:288670
Round=6,Low=18.0, High=22.0, Candidates=288670
Select highest bound
Eccentricity = 18.0
Directly use Dist
Candidates Count 1 : 288670
Candidates Count 2:67451
Round=7,Low=18.0, High=22.0, Candidates=67451

spark.info 日志的近端

6/12/12 14:03:09 WARN YarnAllocator: Expected to find pending requests, but found none.
16/12/12 14:06:21 INFO YarnAllocator: Canceling requests for 0 executor containers
16/12/12 14:06:33 WARN YarnAllocator: Expected to find pending requests, but found none.
16/12/12 14:14:26 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
16/12/12 14:18:14 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
    at io.netty.util.internal.MpscLinkedQueue.offer(MpscLinkedQueue.java:123)
    at io.netty.util.internal.MpscLinkedQueue.add(MpscLinkedQueue.java:218)
    at io.netty.util.concurrent.SingleThreadEventExecutor.fetchFromScheduledTaskQueue(SingleThreadEventExecutor.java:260)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:347)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:112)
    at java.lang.Thread.run(Thread.java:744)
16/12/12 14:18:14 WARN DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-552217672-100.76.16.204-1470826698239:blk_1377987137_304302272
java.io.EOFException: Premature EOF: no length prefix available
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
16/12/12 14:14:39 WARN AbstractConnector: 
java.lang.OutOfMemoryError: Java heap space
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:233)
    at org.spark-project.jetty.server.nio.SelectChannelConnector.accept(SelectChannelConnector.java:109)
    at org.spark-project.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.java:938)
    at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
    at org.spark-project.jetty.util.thread.QueuedThreadPool.run(QueuedThreadPool.java:543)
    at java.lang.Thread.run(Thread.java:744)
16/12/12 14:20:06 INFO ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Exception was thrown 1 time(s) from Reporter thread.)
16/12/12 14:19:38 WARN DFSClient: Error Recovery for block BP-552217672-100.76.16.204-1470826698239:blk_1377987137_304302272 in pipeline 100.76.15.28:9003, 100.76.48.218:9003, 100.76.48.199:9003: bad datanode 100.76.15.28:9003
16/12/12 14:18:58 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/12/12 14:20:49 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-198] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
16/12/12 14:20:49 INFO SparkContext: Invoking stop() from shutdown hook
16/12/12 14:20:49 INFO ContextCleaner: Cleaned shuffle 446
16/12/12 14:20:49 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveRdd(2567)] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at scala.util.Failure$$anonfun$recover.apply(Try.scala:185)
    at scala.util.Try$.apply(Try.scala:161)
    at scala.util.Failure.recover(Try.scala:185)
    at scala.concurrent.Future$$anonfun$recover.apply(Future.scala:324)
    at scala.concurrent.Future$$anonfun$recover.apply(Future.scala:324)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280)
    at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270)
    at scala.concurrent.Future$class.recover(Future.scala:324)
    at scala.concurrent.impl.Promise$DefaultPromise.recover(Promise.scala:153)
    at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:376)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:100)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:104)
    at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1630)
    at org.apache.spark.ContextCleaner.doCleanupRDD(ContextCleaner.scala:208)
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$$anonfun$apply$mcV$sp.apply(ContextCleaner.scala:185)
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$$anonfun$apply$mcV$sp.apply(ContextCleaner.scala:180)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning.apply$mcV$sp(ContextCleaner.scala:180)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
    at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
    at org.apache.spark.ContextCleaner$$anon.run(ContextCleaner.scala:68)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.
    at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
    at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:364)
    ... 12 more
16/12/12 14:20:49 WARN QueuedThreadPool: 5 threads could not be stopped
16/12/12 14:20:49 INFO SparkUI: Stopped Spark web UI at http://10.215.154.152:56338
16/12/12 14:20:49 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/12/12 14:20:49 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/12/12 14:21:04 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveRdd(2567)] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at scala.util.Failure$$anonfun$recover.apply(Try.scala:185)

gc.log

的近端
2016-12-12T14:10:43.541+0800: 16832.953: [Full GC 2971008K->2971007K(2971008K), 11.4284920 secs]
2016-12-12T14:10:54.990+0800: 16844.403: [Full GC 2971007K->2971007K(2971008K), 11.4479110 secs]
2016-12-12T14:11:06.457+0800: 16855.870: [GC 2971007K(2971008K), 0.6827710 secs]
2016-12-12T14:11:08.825+0800: 16858.237: [Full GC 2971007K->2971007K(2971008K), 11.5480350 secs]
2016-12-12T14:11:20.384+0800: 16869.796: [Full GC 2971007K->2971007K(2971008K), 11.0481490 secs]
2016-12-12T14:11:31.442+0800: 16880.855: [Full GC 2971007K->2971007K(2971008K), 11.0184790 secs]
2016-12-12T14:11:42.472+0800: 16891.884: [Full GC 2971008K->2971008K(2971008K), 11.3124900 secs]
2016-12-12T14:11:53.795+0800: 16903.207: [Full GC 2971008K->2971008K(2971008K), 10.9517160 secs]
2016-12-12T14:12:04.760+0800: 16914.172: [Full GC 2971008K->2971007K(2971008K), 11.0969500 secs]
2016-12-12T14:12:15.868+0800: 16925.281: [Full GC 2971008K->2971008K(2971008K), 11.1244090 secs]
2016-12-12T14:12:27.003+0800: 16936.416: [Full GC 2971008K->2971008K(2971008K), 11.0206800 secs]
2016-12-12T14:12:38.035+0800: 16947.448: [Full GC 2971008K->2971008K(2971008K), 11.0024270 secs]
2016-12-12T14:12:49.048+0800: 16958.461: [Full GC 2971008K->2971008K(2971008K), 10.9831440 secs]
2016-12-12T14:13:00.042+0800: 16969.454: [GC 2971008K(2971008K), 0.7338780 secs]
2016-12-12T14:13:02.496+0800: 16971.908: [Full GC 2971008K->2971007K(2971008K), 11.1536860 secs]
2016-12-12T14:13:13.661+0800: 16983.074: [Full GC 2971007K->2971007K(2971008K), 10.9956150 secs]
2016-12-12T14:13:24.667+0800: 16994.080: [Full GC 2971007K->2971007K(2971008K), 11.0139660 secs]
2016-12-12T14:13:35.691+0800: 17005.104: [GC 2971007K(2971008K), 0.6693770 secs]
2016-12-12T14:13:38.115+0800: 17007.527: [Full GC 2971007K->2971006K(2971008K), 11.0514040 secs]
2016-12-12T14:13:49.178+0800: 17018.590: [Full GC 2971007K->2971007K(2971008K), 10.8881160 secs]
2016-12-12T14:14:00.076+0800: 17029.489: [GC 2971007K(2971008K), 0.7046370 secs]
2016-12-12T14:14:02.498+0800: 17031.910: [Full GC 2971007K->2971007K(2971008K), 11.3424300 secs]
2016-12-12T14:14:13.862+0800: 17043.274: [Full GC 2971008K->2971006K(2971008K), 11.6215890 secs]
2016-12-12T14:14:25.503+0800: 17054.915: [GC 2971006K(2971008K), 0.7196840 secs]
2016-12-12T14:14:27.857+0800: 17057.270: [Full GC 2971008K->2971007K(2971008K), 11.3879990 secs]
2016-12-12T14:14:39.266+0800: 17068.678: [Full GC 2971007K->2971007K(2971008K), 11.1611420 secs]
2016-12-12T14:14:50.446+0800: 17079.859: [GC 2971007K(2971008K), 0.6976180 secs]
2016-12-12T14:14:52.782+0800: 17082.195: [Full GC 2971007K->2971007K(2971008K), 11.4318900 secs]
2016-12-12T14:15:04.235+0800: 17093.648: [Full GC 2971007K->2971007K(2971008K), 11.3429010 secs]
2016-12-12T14:15:15.598+0800: 17105.010: [GC 2971007K(2971008K), 0.6832320 secs]
2016-12-12T14:15:17.930+0800: 17107.343: [Full GC 2971008K->2971007K(2971008K), 11.1898520 secs]
2016-12-12T14:15:29.131+0800: 17118.544: [Full GC 2971007K->2971007K(2971008K), 10.9680150 secs]
2016-12-12T14:15:40.110+0800: 17129.522: [GC 2971007K(2971008K), 0.7444890 secs]
2016-12-12T14:15:42.508+0800: 17131.920: [Full GC 2971007K->2971007K(2971008K), 11.3052160 secs]
2016-12-12T14:15:53.824+0800: 17143.237: [Full GC 2971007K->2971007K(2971008K), 10.9484100 secs]
2016-12-12T14:16:04.783+0800: 17154.196: [Full GC 2971007K->2971007K(2971008K), 10.9543950 secs]
2016-12-12T14:16:15.748+0800: 17165.160: [GC 2971007K(2971008K), 0.7066150 secs]
2016-12-12T14:16:18.176+0800: 17167.588: [Full GC 2971007K->2971007K(2971008K), 11.1201370 secs]
2016-12-12T14:16:29.307+0800: 17178.719: [Full GC 2971007K->2971007K(2971008K), 11.0746950 secs]
2016-12-12T14:16:40.392+0800: 17189.805: [Full GC 2971007K->2971007K(2971008K), 11.0036170 secs]
2016-12-12T14:16:51.407+0800: 17200.819: [Full GC 2971007K->2971007K(2971008K), 10.9655670 secs]
2016-12-12T14:17:02.383+0800: 17211.796: [Full GC 2971007K->2971007K(2971008K), 10.7348560 secs]
2016-12-12T14:17:13.128+0800: 17222.540: [GC 2971007K(2971008K), 0.6679470 secs]
2016-12-12T14:17:15.450+0800: 17224.862: [Full GC 2971007K->2971007K(2971008K), 10.6219270 secs]
2016-12-12T14:17:26.081+0800: 17235.494: [Full GC 2971007K->2971007K(2971008K), 10.9158450 secs]
2016-12-12T14:17:37.016+0800: 17246.428: [Full GC 2971007K->2971007K(2971008K), 11.3107490 secs]
2016-12-12T14:17:48.337+0800: 17257.750: [Full GC 2971007K->2971007K(2971008K), 11.0769460 secs]
2016-12-12T14:17:59.424+0800: 17268.836: [GC 2971007K(2971008K), 0.6707600 secs]
2016-12-12T14:18:01.850+0800: 17271.262: [Full GC 2971007K->2970782K(2971008K), 12.6348300 secs]
2016-12-12T14:18:14.496+0800: 17283.909: [GC 2970941K(2971008K), 0.7525790 secs]
2016-12-12T14:18:16.890+0800: 17286.303: [Full GC 2971006K->2970786K(2971008K), 13.1047470 secs]
2016-12-12T14:18:30.008+0800: 17299.421: [GC 2970836K(2971008K), 0.8139710 secs]
2016-12-12T14:18:32.458+0800: 17301.870: [Full GC 2971005K->2970873K(2971008K), 13.0410540 secs]
2016-12-12T14:18:45.512+0800: 17314.925: [Full GC 2971007K->2970893K(2971008K), 12.7169690 secs]
2016-12-12T14:18:58.239+0800: 17327.652: [GC 2970910K(2971008K), 0.7314350 secs]
2016-12-12T14:19:00.557+0800: 17329.969: [Full GC 2971008K->2970883K(2971008K), 11.1889000 secs]
2016-12-12T14:19:11.767+0800: 17341.180: [Full GC 2971006K->2970940K(2971008K), 11.4069700 secs]
2016-12-12T14:19:23.185+0800: 17352.597: [GC 2970950K(2971008K), 0.6689360 secs]
2016-12-12T14:19:25.484+0800: 17354.896: [Full GC 2971007K->2970913K(2971008K), 12.6980050 secs]
2016-12-12T14:19:38.194+0800: 17367.607: [Full GC 2971004K->2970902K(2971008K), 12.7641130 secs]
2016-12-12T14:19:50.968+0800: 17380.380: [GC 2970921K(2971008K), 0.6966130 secs]
2016-12-12T14:19:53.266+0800: 17382.678: [Full GC 2971007K->2970875K(2971008K), 12.9416660 secs]
2016-12-12T14:20:06.233+0800: 17395.645: [Full GC 2971007K->2970867K(2971008K), 13.2740780 secs]
2016-12-12T14:20:19.527+0800: 17408.939: [GC 2970881K(2971008K), 0.7696770 secs]
2016-12-12T14:20:22.024+0800: 17411.436: [Full GC 2971007K->2970886K(2971008K), 13.8729770 secs]
2016-12-12T14:20:35.919+0800: 17425.331: [Full GC 2971002K->2915146K(2971008K), 12.8270160 secs]
2016-12-12T14:20:48.762+0800: 17438.175: [GC 2915155K(2971008K), 0.6856650 secs]
2016-12-12T14:20:51.271+0800: 17440.684: [Full GC 2971007K->2915307K(2971008K), 12.4895750 secs]
2016-12-12T14:21:03.771+0800: 17453.184: [GC 2915320K(2971008K), 0.6249910 secs]
2016-12-12T14:21:06.377+0800: 17455.789: [Full GC 2971007K->2914274K(2971008K), 12.6835220 secs]
2016-12-12T14:21:19.129+0800: 17468.541: [GC 2917963K(2971008K), 0.6917090 secs]
2016-12-12T14:21:21.526+0800: 17470.938: [Full GC 2971007K->2913949K(2971008K), 13.0442320 secs]
2016-12-12T14:21:36.588+0800: 17486.000: [GC 2936827K(2971008K), 0.7244690 secs]

因此,日志显示可能存在 内存泄漏,它可能发生在两个地方: 1) 我的代码或 2) spark graphx api

中的代码

如果出现在我的代码中,谁能帮我找出原因?

我还没有完全解决问题,但我已经部分解决了,

  • 增加驱动程序内存。我上面提到它在第 6 或 7 轮停止,但是当我 double 驱动程序内存时,它会在第 14 轮停止。所以,我认为驱动程序内存 OOM 可能是一个原因。
  • candidates RDD保存到HDFS,下次继续处理。所以,之前的计算不会白费。
  • Kryo序列化候选RDD。它会在解码和编码上花费一些计算,但会节省大量内存。

没有完美的解决方案,但它确实适用于我的情况。不过还是希望其他小伙伴能给个完美的。

我认为 unpersist() API 不会导致内存不足。 OutOfMemory 是由 collect() API 引起的,因为 collect()(这是一个 Action 不同于 Transformation) 将整个 RDD 提取到单个驱动程序机器上。

几点建议:

  1. 增加驱动程序内存中的 RAM 是一种部分解决方案,您已经实施了。如果您正在使用 jdk 8,请使用 G1GC 收集器来管理大型堆。

  2. 您可以玩存储级别(MEMORY_AND_DISKOFF_HEAP 等)为您的应用程序对其进行微调。

查看此官方文档guide了解更多详情。