`reduceByKey` 期间的 Spark 容器和执行器 OOM

Spark Container & Executor OOMs during `reduceByKey`

我是 运行 亚马逊 EMR 上的一个 Spark 作业,在客户端模式下使用 YARN,使用 pyspark,处理来自两个输入文件(总计 200 GB)大小的数据。

该作业将数据连接在一起(使用 reduceByKey),进行一些映射和过滤,然后以 Parquet 格式将其保存到 S3。虽然作业使用 Dataframes 进行保存,但我们所有的实际转换和操作都是在 RDD 上执行的。

请注意,在 "Failures" 部分之后,我已经详细列出了我已经试验过的当前配置和值。

代码

与我们看到的失败相关的代码发生在 reduceByKey 步骤中。我已经包含了几行上下文来显示一个先前的映射函数和实际触发 RDD 上的 reduceByKey 的保存操作:

    # Populate UC Property Type
united_rdd = united_rdd.map(converter.convert_uc_property_type(uc_property_type_mappings))

# Reduce by listingIdSha
united_rdd = united_rdd.reduceByKey(converter.merge_listings)

# Filter by each geoId and write the output to storage
schema = convert_struct(ListingRevision)
for geo in GEO_NORMALIZATION_ENABLED_GEOS:
  regional_rdd = (united_rdd.filter(lambda (id_sha, (listing_revision, geo_id)): geo_id == geo)
                            .map(lambda (id_sha, (listing_revision, geo_id)):
                                 listing_revision))
  regional_df = regional_rdd.map(lambda obj: thrift_to_row(obj, schema)).toDF(schema)
  # Write to Disk/S3
  regional_df.write.format(output_format).mode("overwrite").save(os.path.join(output_dir, geo))
  # Write to Mongo
  (regional_df.write.format("com.mongodb.spark.sql.DefaultSource")
                    .option("spark.mongodb.output.uri", mongo_uri)
                    .option("collection",
                            "{}_{}".format(geo, config.MONGO_OUTPUT_COLLECTION_SUFFIX))
                    .mode("overwrite").save())

失败

由于执行程序 运行 物理内存不足,所述作业失败。多个执行程序遇到此故障,但这是在 EMR 步骤的 stderr 中打印并显示在 Spark History 服务器中的一个示例 UI:

 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2787 in stage 3.0 failed 4 times,
 most recent failure: Lost task 2787.3 in stage 3.0 (TID 5792, ip-10-0-10-197.ec2.internal): 
 ExecutorLostFailure (executor 47 exited caused by one of the running tasks) 
 Reason: Container killed by YARN for exceeding memory limits. 20.0 GB of 20 GB physical memory used. 
 Consider boosting spark.yarn.executor.memoryOverhead.
 Driver stacktrace:
   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1442)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1441)
   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
   at scala.Option.foreach(Option.scala:257)
   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
   at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1923)
   at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143)
   ... 29 more

发现这一点后,我深入研究了各个节点的 YARN 和容器日志,发现 YARN 日志消息包含物理内存使用率峰值和容器日志中的 java.lang.OutOfMemory 异常(按下述顺序包含) ).

Java OutOfMemory 来自容器日志的错误:

17/03/28 21:41:44 WARN TransportChannelHandler: Exception in connection from ip-10-0-10-70.ec2.internal/10.0.10.70:7337
java.lang.OutOfMemoryError: Direct buffer memory
  at java.nio.Bits.reserveMemory(Bits.java:693)
  at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
  at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
  at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645)
  at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228)
  at io.netty.buffer.PoolArena.allocate(PoolArena.java:212)
  at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
  at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
  at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
  at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
  at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
  at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
  at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
  at java.lang.Thread.run(Thread.java:745)

YARN 对极端物理内存使用情况的识别:

2017-03-28 21:42:48,986 INFO   org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 6310 for container-id container_1490736006967_0001_01_000015: 20.3 GB of 20 GB physical memory used; 24.9 GB of 100 GB virtual memory used

总而言之,我似乎 运行 在洗牌期间内存不足,尽管我将执行程序的一半内存分配给堆外 space 并尝试了各种执行程序内存设置和核心。我还缺少其他可以尝试的东西吗?根据我读过的其他一些有用的帖子 (),这些是物理内存问题最常见的罪魁祸首。数据倾斜是否可能导致这种情况?我已经尝试测量较小数据子集的分区分布,它看起来很正常,但我无法对这项工作的所有数据进行测量,因为它永远不会完成。

配置

EMR Spark 提交命令:

spark-submit \
   --deploy-mode client /home/hadoop/src/python/uc_spark/data_sources/realtytrac/assr_hist_extractor.py \
   --dataset_settings development \
   --mongo_uri <Internal Mongo URI> \
   --aws_access_key_id <S3 Access Key> \
   --aws_secret_key <S3 Secret Key> \
   --listing_normalization_server <Internal EC2 Address>:9502

相关Spark环境配置: spark.executor.memory - 8 GB(每个执行程序有 20 GB 的可用内存。) spark.yarn.executor.memoryOverhead - 12 GB spark.executor.cores - 1(我尝试过的最低值,希望它能起作用。) spark.default.parallelism - 1024(根据其他参数自动配置。我试过4099没用。)

我有 运行 64 m3.2xlarge 台机器,总计 1.41 TB 内存。

注意:除了驱动程序内存没有运气外,我已经对所有内存参数进行了广泛的值试验。

更新 1

我重构了我的代码以使用 Dataframe 的 join 而不是 RDD 的 union 来组合两个输入文件。这样做后,我有了两个重要发现:

A rightOuter 连接,与我们默认的 leftOuter 连接相反,减少了我们的输出大小但解决了问题。鉴于此,我相当确定 rightOuter 连接排除了一小部分偏斜数据。不幸的是,我需要做更多调查以确定丢失的数据是否重要;我们还在探索。

使用 Dataframes 在过程的早期导致了更明显的失败:

FetchFailed(BlockManagerId(80, ip-10-0-10-145.ec2.internal, 7337), shuffleId=2, mapId=35, reduceId=435, message=
org.apache.spark.shuffle.FetchFailedException: Too large frame: 3095111448
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
  at scala.collection.Iterator$$anon.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
  at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
  at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
  at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:328)
  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
  at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.IllegalArgumentException: Too large frame: 3095111448
  at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
  at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
  at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
  at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
  at java.lang.Thread.run(Thread.java:745)

)

我们在洗牌期间失败,因为单个分区获取了太多数据,3 GB "frame"。

我将在今天剩下的时间里探索如何消除数据倾斜以及我们是否可以进行 leftOuter 连接。

万一有人发现这个问题,原来是数据倾斜导致的。我通过将两个输入文件的初始组合切换为使用 Dataframe join 而不是 RDD union 来发现这一点。这导致了一个更容易理解的错误,表明我们的洗牌尝试检索数据失败。为了解决这个问题,我围绕一个均匀分布的键对数据进行分区,然后一切正常。