为什么 Spark 作业失败并显示 org.apache.spark.shuffle.MetadataFetchFailedException:在推测模式下缺少 shuffle 0 的输出位置?

Why do Spark jobs fail with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 in speculation mode?

我是 运行 处于推测模式的 Spark 作业。我有大约 500 个任务和大约 500 个 1 GB gz 压缩文件。我一直在处理每项工作,对于 1-2 项任务,附加错误会在之后重新运行数十次(阻止工作完成)。

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

知道问题的含义是什么以及如何克服它吗?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:384)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:381)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
    at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

我们在 Spark 中遇到了类似的错误,但我不确定它是否与您的问题有关。

我们对 100GB 数据使用了 JavaPairRDD.repartitionAndSortWithinPartitions,它一直失败,与您的应用类似。然后我们查看了特定节点上的 Yarn 日志,发现我们遇到了某种内存不足的问题,因此 Yarn 中断了执行。我们的解决方案是 change/add spark.shuffle.memoryFraction 0 in .../spark/conf/spark-defaults.conf。这使我们能够以这种方式处理更大(但不幸的是不是无限)的数据量。

当我为工作节点分配的内存超过它的内存时,我就遇到了这种情况。由于它没有交换,spark 在尝试存储用于洗牌的对象时崩溃,没有剩余内存。

解决方案是添加交换,或者将 worker/executor 配置为使用更少的内存,此外还对多个持久性文件使用 MEMORY_AND_DISK 存储级别。

我在 3 台机器的 YARN 集群上遇到了同样的问题。我一直在更换 RAM,但问题仍然存在。最后我在日志中看到以下消息:

17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms

在此之后,出现了这条信息:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67

我修改了spark-defaults.conf中的属性如下:

spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000

就是这样!之后我的工作成功完成。

在我的例子中(独立集群)抛出了异常,因为一些 Spark 从属的文件系统被 100% 填充了。删除从站 spark/work 文件夹中的所有内容解决了这个问题。

我解决了这个错误,增加了 executorMemory 和 driverMemory 中分配的内存。您可以在 HUE 中选择导致问题的 Spark 程序并在属性 -> 选项列表中执行此操作,您可以添加如下内容:

--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2

当然,参数的值会根据您集群的大小和您的需要而有所不同。

我遇到了同样的问题,但是我搜索了很多答案都不能解决我的问题。最终,我一步步调试我的代码。我发现每个分区的数据大小不平衡导致的问题,导致 MetadataFetchFailedExceptionmap 阶段而不是 reduce 阶段。在 reduceByKey()

之前做 df_rdd.repartition(nums)

对我来说,我对大数据(大约 50B 行)进行了一些窗口化处理,得到了

ExternalAppendOnlyUnsafeRowArray:54 - Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

在我的日志中。显然 4096 在这样的数据大小上可能很小......这让我想到了以下 JIRA:

https://issues.apache.org/jira/browse/SPARK-21595

最终到以下两个配置选项:

  • spark.sql.windowExec.buffer.spill.threshold
  • spark.sql.windowExec.buffer.in.memory.threshold

均默认为4096;我将它们提高了很多 (2097152),现在情况似乎还不错。我不是 100% 确定这与此处提出的问题相同,但这是另一回事。

在 Spark Web UI 中,如果有类似 Executors lost 的信息,那么您必须 查看yarn log,确认你的容器是否被kill掉

如果容器被杀死,可能是因为内存不足。

如何在纱线日志中找到关键信息?例如,可能会有这样的警告:

Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.

在这种情况下,建议您增加spark.yarn.executor.memoryOverhead

当特定spark分区中有大量数据时会出现错误。解决此问题的方法是执行以下步骤:

  1. 增加随机分区的数量:--conf spark.sql.shuffle.partitions=
  2. 在正常情况下,分区数应设置为执行器数*每个执行器的核心数。但是如果我们有大量的数据,这种分区方案就会有问题。请参阅下面的示例。

假设我们有以下数据,并且我们有三个执行器,每个执行器有 1 个核心,那么在这种情况下分区(物理分区)的数量将为 3

 Data:  1,2,3,4,5,6,7,8,9,13,16,19,22

 Partitions:  1,2,3 
 Distribution of Data in Partitions (partition logic based on modulo by 3)

          1-> 1,4,7,13,16,19,22
          2-> 2,5,8
          3->3,6,9
 From above we can see that there is data skew, partition 1 is having more 
 data than the rest
 
 Now lets increase the number of partitions to : number of executors * number 
 of cores per executor*2 = 6 (in our example. These 6 partitions will be 
 logical partitions.Now each executor will be having 2 logical partitions 
 instead of 1 .Data partitioning will be based on modulo 6 instead of 3.

 Partitions of data in each executor:

        1->(0,1)->1,6,7,13,19
        2->(2,3)-->2,3,8,9
        3->(4,5)->4,5,16,22
The increase in logical partitions leads to fair partitioning.
  1. 增加shuffle分区数后接下来可以做的是 如果您不坚持或减少火花存储器的存储部分 缓存任何数据帧。默认情况下,存储部分为 0.5,执行部分 也是 0.5 。要减少存储部分,您可以在 spark-submit 中设置 命令以下配置

        --conf spark.memory.storageFraction=0.3
    

4.) 除了以上两点你还可以设置executor overhead memory。 --conf spark.executor.memoryOverhead=2g

 This is off-heap memory that is used for Virtual Machine overheads, interned 
 strings etc.

5.)除此之外,您还可以限制一次处理的文件数量 通过将 maxFilesPerTrigger 设置为更小的特定微批次 价值说 10.