Spark 1.5.2 Shuffle/Serialization - 运行 内存不足
Spark 1.5.2 Shuffle/Serialization - running out of memory
我正在处理数百 GB 的数据集(大约 2B 行)。其中一项操作是将 RDD 或 scala case 对象(包含双打、映射、集合)缩减为单个实体。最初我的操作正在执行 groupByKey
但它很慢并且正在执行高 GC。所以我尝试将其转换为 aggregateByKey
,后来甚至转换为 reduceByKey
,希望避免高用户内存分配、洗牌 activity 和我在使用 groupBy 时遇到的高 gc 问题。
应用程序资源: 23GB exec mem + 4GB 开销。 20 个实例,每个实例 6 个核心。从 0.2 到 0.4
玩随机比
可用集群资源 10 个节点,yarn 总共 600GB,最大容器大小 32GB
2016-05-02 22:38:53,595 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 3 to hdn2.mycorp:45993
2016-05-02 22:38:53,832 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.storage.BlockManagerInfo: Removed broadcast_4_piece0 on 10.250.70.117:52328 in memory (size: 2.1 KB, free: 15.5 MB)
2016-05-02 22:39:03,704 WARN [New I/O worker #5] org.jboss.netty.channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xa8147f0c, /10.250.70.110:48056 => /10.250.70.117:38300] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at akka.remote.EndpointWriter$$anonfun$serializeMessage.apply(Endpoint.scala:843)
at akka.remote.EndpointWriter$$anonfun$serializeMessage.apply(Endpoint.scala:843)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
at akka.remote.EndpointWriter$$anonfun.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-2] akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
67247,1 99%
关于工作
读取具有大约 20 个字段的输入数据集。 1B-2B。创建一个聚合超过 10 个唯一字段的输出数据集。这基本上成为查询条件。然而,在这 10 个中,有 3 个字段代表它们的各种组合,因此我们不必查询多条记录来获取一组。在这 3 个字段中,让 sat a、b 和 c 每个都有 11、2 和 2 个可能的值。所以我们可以获得给定键的最大 2^11 -1 * 2^2 - 1 * 2^2 -1 组合。
//pseudo code where I use aggregateByKey
case class UserDataSet(salary: Double, members: Int, clicks: Map[Int, Long],
businesses: Map[Int, Set[Int]])...) //About 10 fileds with 5 of them are maps
def main() = {
create combinationRDD of type (String, Set[Set]) Rdd from input dataset which represent all combination
create a joinedRdd of type (String, UserDataSet) - where key at this point already a final key which contains 10 unique fields; value is a UserDataSet
//This is where things fails
val finalDataSet = joinedRdd.aggregateByKey(UserDataSet.getInstance())(processDataSeq, processDataMerge)
}
private def processDataMerge(map1: UserDataSet, map2: UserDataSet) = {
map1.clicks ++= map2.clicks (deep merge of course to avoid overwriting of map keys)
map1.salary += map2.salary
map1
}
为了能够提供帮助,您应该 post 代码并给出输入数据的解释。
为什么是数据?
按键聚合时,要实现最佳并行性并避免出现问题,重要的是要了解键的分布以及基数。
让我解释一下它们是什么以及它们为什么重要。
假设您按国家/地区汇总...地球上大约有 250 个国家/地区,因此密钥的 基数 约为 250。
基数很重要,因为基数低可能会扼杀并行性。例如,如果您 90% 的数据是针对美国的,并且您有 250 个节点,那么一个节点将处理 90% 的数据。
这就引出了分布的概念,也就是说,当你按键分组时,每个键有多少个值就是你的值分布。为了获得最佳并行性,理想情况下您希望每个键的值数量大致相同。
现在,如果您的数据的基数非常高,但值分布不是最优的,从统计学上讲应该是平衡的。
例如,假设您有 apache 日志,其中大多数用户只访问了几个页面,但有些用户访问了很多(机器人就是这种情况)。
如果用户数量远大于节点数量,则拥有大量数据的用户会分布在节点周围,因此并行性不会受到太大影响。
当您使用基数较低的密钥时,通常会出现问题。
如果值的分布不佳,则不太可能导致洗衣机不平衡的问题。
最后但同样重要的是,它还很大程度上取决于您在 aggregateByKey 上所做的事情。如果您在 map 或 reduce 处理阶段泄漏对象,则很容易耗尽内存。
所以问题确实是驱动程序 运行 内存不足,而不是执行程序。因此错误出现在驱动程序日志中。呃。但是从日志中不是很清楚。驱动程序 运行 退出,因为 1) 它使用默认的 -Xmx900m 2) Spark 驱动程序依赖于 akka 库,而 akka 库依赖于顽固的 JavaSerializer,它使用字节数组而不是流来序列化对象。作为临时解决方案,我将 spark.driver.memory 增加到 4096m,此后我再也没有看到内存错误。感谢大家对问题的一些见解 space。
我正在处理数百 GB 的数据集(大约 2B 行)。其中一项操作是将 RDD 或 scala case 对象(包含双打、映射、集合)缩减为单个实体。最初我的操作正在执行 groupByKey
但它很慢并且正在执行高 GC。所以我尝试将其转换为 aggregateByKey
,后来甚至转换为 reduceByKey
,希望避免高用户内存分配、洗牌 activity 和我在使用 groupBy 时遇到的高 gc 问题。
应用程序资源: 23GB exec mem + 4GB 开销。 20 个实例,每个实例 6 个核心。从 0.2 到 0.4
玩随机比可用集群资源 10 个节点,yarn 总共 600GB,最大容器大小 32GB
2016-05-02 22:38:53,595 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 3 to hdn2.mycorp:45993
2016-05-02 22:38:53,832 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.storage.BlockManagerInfo: Removed broadcast_4_piece0 on 10.250.70.117:52328 in memory (size: 2.1 KB, free: 15.5 MB)
2016-05-02 22:39:03,704 WARN [New I/O worker #5] org.jboss.netty.channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xa8147f0c, /10.250.70.110:48056 => /10.250.70.117:38300] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at akka.remote.EndpointWriter$$anonfun$serializeMessage.apply(Endpoint.scala:843)
at akka.remote.EndpointWriter$$anonfun$serializeMessage.apply(Endpoint.scala:843)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
at akka.remote.EndpointWriter$$anonfun.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-2] akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary.apply(Serializer.scala:129)
67247,1 99%
关于工作 读取具有大约 20 个字段的输入数据集。 1B-2B。创建一个聚合超过 10 个唯一字段的输出数据集。这基本上成为查询条件。然而,在这 10 个中,有 3 个字段代表它们的各种组合,因此我们不必查询多条记录来获取一组。在这 3 个字段中,让 sat a、b 和 c 每个都有 11、2 和 2 个可能的值。所以我们可以获得给定键的最大 2^11 -1 * 2^2 - 1 * 2^2 -1 组合。
//pseudo code where I use aggregateByKey
case class UserDataSet(salary: Double, members: Int, clicks: Map[Int, Long],
businesses: Map[Int, Set[Int]])...) //About 10 fileds with 5 of them are maps
def main() = {
create combinationRDD of type (String, Set[Set]) Rdd from input dataset which represent all combination
create a joinedRdd of type (String, UserDataSet) - where key at this point already a final key which contains 10 unique fields; value is a UserDataSet
//This is where things fails
val finalDataSet = joinedRdd.aggregateByKey(UserDataSet.getInstance())(processDataSeq, processDataMerge)
}
private def processDataMerge(map1: UserDataSet, map2: UserDataSet) = {
map1.clicks ++= map2.clicks (deep merge of course to avoid overwriting of map keys)
map1.salary += map2.salary
map1
}
为了能够提供帮助,您应该 post 代码并给出输入数据的解释。
为什么是数据? 按键聚合时,要实现最佳并行性并避免出现问题,重要的是要了解键的分布以及基数。
让我解释一下它们是什么以及它们为什么重要。 假设您按国家/地区汇总...地球上大约有 250 个国家/地区,因此密钥的 基数 约为 250。
基数很重要,因为基数低可能会扼杀并行性。例如,如果您 90% 的数据是针对美国的,并且您有 250 个节点,那么一个节点将处理 90% 的数据。
这就引出了分布的概念,也就是说,当你按键分组时,每个键有多少个值就是你的值分布。为了获得最佳并行性,理想情况下您希望每个键的值数量大致相同。
现在,如果您的数据的基数非常高,但值分布不是最优的,从统计学上讲应该是平衡的。 例如,假设您有 apache 日志,其中大多数用户只访问了几个页面,但有些用户访问了很多(机器人就是这种情况)。 如果用户数量远大于节点数量,则拥有大量数据的用户会分布在节点周围,因此并行性不会受到太大影响。
当您使用基数较低的密钥时,通常会出现问题。 如果值的分布不佳,则不太可能导致洗衣机不平衡的问题。
最后但同样重要的是,它还很大程度上取决于您在 aggregateByKey 上所做的事情。如果您在 map 或 reduce 处理阶段泄漏对象,则很容易耗尽内存。
所以问题确实是驱动程序 运行 内存不足,而不是执行程序。因此错误出现在驱动程序日志中。呃。但是从日志中不是很清楚。驱动程序 运行 退出,因为 1) 它使用默认的 -Xmx900m 2) Spark 驱动程序依赖于 akka 库,而 akka 库依赖于顽固的 JavaSerializer,它使用字节数组而不是流来序列化对象。作为临时解决方案,我将 spark.driver.memory 增加到 4096m,此后我再也没有看到内存错误。感谢大家对问题的一些见解 space。