Spark 1.5.2 Shuffle/Serialization - 运行 内存不足

Spark 1.5.2 Shuffle/Serialization - running out of memory

我正在处理数百 GB 的数据集(大约 2B 行)。其中一项操作是将 RDD 或 scala case 对象(包含双打、映射、集合)缩减为单个实体。最初我的操作正在执行 groupByKey 但它很慢并且正在执行高 GC。所以我尝试将其转换为 aggregateByKey,后来甚至转换为 reduceByKey,希望避免高用户内存分配、洗牌 activity 和我在使用 groupBy 时遇到的高 gc 问题。

应用程序资源: 23GB exec mem + 4GB 开销。 20 个实例,每个实例 6 个核心。从 0.2 到 0.4

玩随机比

可用集群资源 10 个节点,yarn 总共 600GB,最大容器大小 32GB

2016-05-02 22:38:53,595 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 3 to hdn2.mycorp:45993
2016-05-02 22:38:53,832 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.storage.BlockManagerInfo: Removed broadcast_4_piece0 on 10.250.70.117:52328 in memory (size: 2.1 KB, free: 15.5 MB)
2016-05-02 22:39:03,704 WARN [New I/O worker #5] org.jboss.netty.channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xa8147f0c, /10.250.70.110:48056 => /10.250.70.117:38300] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
        at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
        at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
        at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
        at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
        at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at akka.serialization.JavaSerializer$$anonfun$toBinary.apply$mcV$sp(Serializer.scala:129)
        at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
        at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
        at akka.remote.EndpointWriter$$anonfun$serializeMessage.apply(Endpoint.scala:843)
        at akka.remote.EndpointWriter$$anonfun$serializeMessage.apply(Endpoint.scala:843)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
        at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
        at akka.remote.EndpointWriter$$anonfun.applyOrElse(Endpoint.scala:718)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-2] akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at akka.serialization.JavaSerializer$$anonfun$toBinary.apply$mcV$sp(Serializer.scala:129)
        at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
                                                                                                                                                                                                                              67247,1       99%

关于工作 读取具有大约 20 个字段的输入数据集。 1B-2B。创建一个聚合超过 10 个唯一字段的输出数据集。这基本上成为查询条件。然而,在这 10 个中,有 3 个字段代表它们的各种组合,因此我们不必查询多条记录来获取一组。在这 3 个字段中,让 sat a、b 和 c 每个都有 11、2 和 2 个可能的值。所以我们可以获得给定键的最大 2^11 -1 * 2^2 - 1 * 2^2 -1 组合。

//pseudo code where I use aggregateByKey 

case class UserDataSet(salary: Double, members: Int, clicks: Map[Int, Long],
    businesses: Map[Int, Set[Int]])...) //About 10 fileds with 5 of them are maps

def main() = {

      create combinationRDD of type (String, Set[Set]) Rdd from input dataset which represent all combination
      create a joinedRdd of type (String, UserDataSet) - where key at this point already a final key which contains 10 unique fields; value is a UserDataSet

//This is where things fails
     val finalDataSet = joinedRdd.aggregateByKey(UserDataSet.getInstance())(processDataSeq, processDataMerge)

}    

private def processDataMerge(map1: UserDataSet, map2: UserDataSet) = { 

    map1.clicks ++= map2.clicks (deep merge of course to avoid overwriting of map keys)
    map1.salary += map2.salary

    map1
}

为了能够提供帮助,您应该 post 代码并给出输入数据的解释。

为什么是数据? 按键聚合时,要实现最佳并行性并避免出现问题,重要的是要了解键的分布以及基数。

让我解释一下它们是什么以及它们为什么重要。 假设您按国家/地区汇总...地球上大约有 250 个国家/地区,因此密钥的 基数 约为 250。

基数很重要,因为基数低可能会扼杀并行性。例如,如果您 90% 的数据是针对美国的,并且您有 250 个节点,那么一个节点将处理 90% 的数据。

这就引出了分布的概念,也就是说,当你按键分组时,每个键有多少个值就是你的值分布。为了获得最佳并行性,理想情况下您希望每个键的值数量大致相同。

现在,如果您的数据的基数非常高,但值分布不是最优的,从统计学上讲应该是平衡的。 例如,假设您有 apache 日志,其中大多数用户只访问了几个页面,但有些用户访问了很多(机器人就是这种情况)。 如果用户数量远大于节点数量,则拥有大量数据的用户会分布在节点周围,因此并行性不会受到太大影响。

当您使用基数较低的密钥时,通常会出现问题。 如果值的分布不佳,则不太可能导致洗衣机不平衡的问题。

最后但同样重要的是,它还很大程度上取决于您在 aggregateByKey 上所做的事情。如果您在 map 或 reduce 处理阶段泄漏对象,则很容易耗尽内存。

所以问题确实是驱动程序 运行 内存不足,而不是执行程序。因此错误出现在驱动程序日志中。呃。但是从日志中不是很清楚。驱动程序 运行 退出,因为 1) 它使用默认的 -Xmx900m 2) Spark 驱动程序依赖于 akka 库,而 akka 库依赖于顽固的 JavaSerializer,它使用字节数组而不是流来序列化对象。作为临时解决方案,我将 spark.driver.memory 增加到 4096m,此后我再也没有看到内存错误。感谢大家对问题的一些见解 space。