为什么 Spark 作业失败并显示 org.apache.spark.shuffle.MetadataFetchFailedException:在推测模式下缺少 shuffle 0 的输出位置?
Why do Spark jobs fail with org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 in speculation mode?
我是 运行 处于推测模式的 Spark 作业。我有大约 500 个任务和大约 500 个 1 GB gz 压缩文件。我一直在处理每项工作,对于 1-2 项任务,附加错误会在之后重新运行数十次(阻止工作完成)。
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
知道问题的含义是什么以及如何克服它吗?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:384)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:381)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
我们在 Spark 中遇到了类似的错误,但我不确定它是否与您的问题有关。
我们对 100GB 数据使用了 JavaPairRDD.repartitionAndSortWithinPartitions
,它一直失败,与您的应用类似。然后我们查看了特定节点上的 Yarn 日志,发现我们遇到了某种内存不足的问题,因此 Yarn 中断了执行。我们的解决方案是 change/add spark.shuffle.memoryFraction 0
in .../spark/conf/spark-defaults.conf
。这使我们能够以这种方式处理更大(但不幸的是不是无限)的数据量。
当我为工作节点分配的内存超过它的内存时,我就遇到了这种情况。由于它没有交换,spark 在尝试存储用于洗牌的对象时崩溃,没有剩余内存。
解决方案是添加交换,或者将 worker/executor 配置为使用更少的内存,此外还对多个持久性文件使用 MEMORY_AND_DISK 存储级别。
我在 3 台机器的 YARN 集群上遇到了同样的问题。我一直在更换 RAM,但问题仍然存在。最后我在日志中看到以下消息:
17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms
在此之后,出现了这条信息:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67
我修改了spark-defaults.conf中的属性如下:
spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000
就是这样!之后我的工作成功完成。
在我的例子中(独立集群)抛出了异常,因为一些 Spark 从属的文件系统被 100% 填充了。删除从站 spark/work
文件夹中的所有内容解决了这个问题。
我解决了这个错误,增加了 executorMemory 和 driverMemory 中分配的内存。您可以在 HUE 中选择导致问题的 Spark 程序并在属性 -> 选项列表中执行此操作,您可以添加如下内容:
--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2
当然,参数的值会根据您集群的大小和您的需要而有所不同。
我遇到了同样的问题,但是我搜索了很多答案都不能解决我的问题。最终,我一步步调试我的代码。我发现每个分区的数据大小不平衡导致的问题,导致 MetadataFetchFailedException
在 map
阶段而不是 reduce
阶段。在 reduceByKey()
之前做 df_rdd.repartition(nums)
对我来说,我对大数据(大约 50B 行)进行了一些窗口化处理,得到了
ExternalAppendOnlyUnsafeRowArray:54
- Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
在我的日志中。显然 4096 在这样的数据大小上可能很小......这让我想到了以下 JIRA:
https://issues.apache.org/jira/browse/SPARK-21595
最终到以下两个配置选项:
spark.sql.windowExec.buffer.spill.threshold
spark.sql.windowExec.buffer.in.memory.threshold
均默认为4096;我将它们提高了很多 (2097152),现在情况似乎还不错。我不是 100% 确定这与此处提出的问题相同,但这是另一回事。
在 Spark Web UI 中,如果有类似 Executors lost
的信息,那么您必须
查看yarn log,确认你的容器是否被kill掉
如果容器被杀死,可能是因为内存不足。
如何在纱线日志中找到关键信息?例如,可能会有这样的警告:
Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
在这种情况下,建议您增加spark.yarn.executor.memoryOverhead
。
当特定spark分区中有大量数据时会出现错误。解决此问题的方法是执行以下步骤:
- 增加随机分区的数量:--conf spark.sql.shuffle.partitions=
- 在正常情况下,分区数应设置为执行器数*每个执行器的核心数。但是如果我们有大量的数据,这种分区方案就会有问题。请参阅下面的示例。
假设我们有以下数据,并且我们有三个执行器,每个执行器有 1 个核心,那么在这种情况下分区(物理分区)的数量将为 3
Data: 1,2,3,4,5,6,7,8,9,13,16,19,22
Partitions: 1,2,3
Distribution of Data in Partitions (partition logic based on modulo by 3)
1-> 1,4,7,13,16,19,22
2-> 2,5,8
3->3,6,9
From above we can see that there is data skew, partition 1 is having more
data than the rest
Now lets increase the number of partitions to : number of executors * number
of cores per executor*2 = 6 (in our example. These 6 partitions will be
logical partitions.Now each executor will be having 2 logical partitions
instead of 1 .Data partitioning will be based on modulo 6 instead of 3.
Partitions of data in each executor:
1->(0,1)->1,6,7,13,19
2->(2,3)-->2,3,8,9
3->(4,5)->4,5,16,22
The increase in logical partitions leads to fair partitioning.
增加shuffle分区数后接下来可以做的是
如果您不坚持或减少火花存储器的存储部分
缓存任何数据帧。默认情况下,存储部分为 0.5,执行部分
也是 0.5 。要减少存储部分,您可以在 spark-submit 中设置
命令以下配置
--conf spark.memory.storageFraction=0.3
4.) 除了以上两点你还可以设置executor overhead memory。
--conf spark.executor.memoryOverhead=2g
This is off-heap memory that is used for Virtual Machine overheads, interned
strings etc.
5.)除此之外,您还可以限制一次处理的文件数量
通过将 maxFilesPerTrigger 设置为更小的特定微批次
价值说 10.
我是 运行 处于推测模式的 Spark 作业。我有大约 500 个任务和大约 500 个 1 GB gz 压缩文件。我一直在处理每项工作,对于 1-2 项任务,附加错误会在之后重新运行数十次(阻止工作完成)。
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
知道问题的含义是什么以及如何克服它吗?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:384)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:381)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
我们在 Spark 中遇到了类似的错误,但我不确定它是否与您的问题有关。
我们对 100GB 数据使用了 JavaPairRDD.repartitionAndSortWithinPartitions
,它一直失败,与您的应用类似。然后我们查看了特定节点上的 Yarn 日志,发现我们遇到了某种内存不足的问题,因此 Yarn 中断了执行。我们的解决方案是 change/add spark.shuffle.memoryFraction 0
in .../spark/conf/spark-defaults.conf
。这使我们能够以这种方式处理更大(但不幸的是不是无限)的数据量。
当我为工作节点分配的内存超过它的内存时,我就遇到了这种情况。由于它没有交换,spark 在尝试存储用于洗牌的对象时崩溃,没有剩余内存。
解决方案是添加交换,或者将 worker/executor 配置为使用更少的内存,此外还对多个持久性文件使用 MEMORY_AND_DISK 存储级别。
我在 3 台机器的 YARN 集群上遇到了同样的问题。我一直在更换 RAM,但问题仍然存在。最后我在日志中看到以下消息:
17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms
在此之后,出现了这条信息:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67
我修改了spark-defaults.conf中的属性如下:
spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000
就是这样!之后我的工作成功完成。
在我的例子中(独立集群)抛出了异常,因为一些 Spark 从属的文件系统被 100% 填充了。删除从站 spark/work
文件夹中的所有内容解决了这个问题。
我解决了这个错误,增加了 executorMemory 和 driverMemory 中分配的内存。您可以在 HUE 中选择导致问题的 Spark 程序并在属性 -> 选项列表中执行此操作,您可以添加如下内容:
--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2
当然,参数的值会根据您集群的大小和您的需要而有所不同。
我遇到了同样的问题,但是我搜索了很多答案都不能解决我的问题。最终,我一步步调试我的代码。我发现每个分区的数据大小不平衡导致的问题,导致 MetadataFetchFailedException
在 map
阶段而不是 reduce
阶段。在 reduceByKey()
df_rdd.repartition(nums)
对我来说,我对大数据(大约 50B 行)进行了一些窗口化处理,得到了
ExternalAppendOnlyUnsafeRowArray:54
- Reached spill threshold of 4096 rows, switching toorg.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
在我的日志中。显然 4096 在这样的数据大小上可能很小......这让我想到了以下 JIRA:
https://issues.apache.org/jira/browse/SPARK-21595
最终到以下两个配置选项:
spark.sql.windowExec.buffer.spill.threshold
spark.sql.windowExec.buffer.in.memory.threshold
均默认为4096;我将它们提高了很多 (2097152),现在情况似乎还不错。我不是 100% 确定这与此处提出的问题相同,但这是另一回事。
在 Spark Web UI 中,如果有类似 Executors lost
的信息,那么您必须
查看yarn log,确认你的容器是否被kill掉
如果容器被杀死,可能是因为内存不足。
如何在纱线日志中找到关键信息?例如,可能会有这样的警告:
Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
在这种情况下,建议您增加spark.yarn.executor.memoryOverhead
。
当特定spark分区中有大量数据时会出现错误。解决此问题的方法是执行以下步骤:
- 增加随机分区的数量:--conf spark.sql.shuffle.partitions=
- 在正常情况下,分区数应设置为执行器数*每个执行器的核心数。但是如果我们有大量的数据,这种分区方案就会有问题。请参阅下面的示例。
假设我们有以下数据,并且我们有三个执行器,每个执行器有 1 个核心,那么在这种情况下分区(物理分区)的数量将为 3
Data: 1,2,3,4,5,6,7,8,9,13,16,19,22
Partitions: 1,2,3
Distribution of Data in Partitions (partition logic based on modulo by 3)
1-> 1,4,7,13,16,19,22
2-> 2,5,8
3->3,6,9
From above we can see that there is data skew, partition 1 is having more
data than the rest
Now lets increase the number of partitions to : number of executors * number
of cores per executor*2 = 6 (in our example. These 6 partitions will be
logical partitions.Now each executor will be having 2 logical partitions
instead of 1 .Data partitioning will be based on modulo 6 instead of 3.
Partitions of data in each executor:
1->(0,1)->1,6,7,13,19
2->(2,3)-->2,3,8,9
3->(4,5)->4,5,16,22
The increase in logical partitions leads to fair partitioning.
增加shuffle分区数后接下来可以做的是 如果您不坚持或减少火花存储器的存储部分 缓存任何数据帧。默认情况下,存储部分为 0.5,执行部分 也是 0.5 。要减少存储部分,您可以在 spark-submit 中设置 命令以下配置
--conf spark.memory.storageFraction=0.3
4.) 除了以上两点你还可以设置executor overhead memory。 --conf spark.executor.memoryOverhead=2g
This is off-heap memory that is used for Virtual Machine overheads, interned
strings etc.
5.)除此之外,您还可以限制一次处理的文件数量 通过将 maxFilesPerTrigger 设置为更小的特定微批次 价值说 10.