处理大数据集时出现 FetchFailedException 或 MetadataFetchFailedException
FetchFailedException or MetadataFetchFailedException when processing big data set
当我 运行 使用 1 GB 数据集的解析代码时,它可以毫无错误地完成。但是,当我一次尝试 25 GB 的数据时,出现以下错误。我试图了解如何避免以下失败。很高兴听到任何建议或想法。
不同的错误,
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094}
集群详细信息:
Yarn: 8 Nodes
Total cores: 64
Memory: 500 GB
Spark Version: 1.5
Spark 提交声明:
spark-submit --master yarn-cluster \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--executor-memory 4g \
--driver-memory 16g \
--num-executors 50 \
--deploy-mode cluster \
--executor-cores 1 \
--class my.parser \
myparser.jar \
-input xxx \
-output xxxx \
堆栈跟踪之一:
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:460)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:456)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
此错误几乎可以肯定是由执行程序上的内存问题引起的。我可以想出几种方法来解决这些类型的问题。
1) 您可以尝试 运行 使用更多分区(在 dataframe
上执行 repartition
)。当一个或多个分区包含的数据多于内存中容纳的数据时,通常会出现内存问题。
2) 我注意到您没有明确设置 spark.yarn.executor.memoryOverhead
,因此它将默认为 max(386, 0.10* executorMemory)
,在您的情况下为 400MB。这对我来说听起来很低。我会尝试将其增加到 1GB(请注意,如果将 memoryOverhead 增加到 1GB,则需要将 --executor-memory
降低到 3GB)
3) 查看故障节点上的日志文件。您要查找文本 "Killing container"。如果您看到文本 "running beyond physical memory limits",根据我的经验,增加 memoryOverhead 将解决问题。
通过将 Spark 超时 spark.network.timeout
增加到更大的值(例如 800),我也取得了一些不错的效果。默认的 120 秒会导致您的许多执行程序在负载过重时超时。
好吧,这是一个旧话题,Whosebug 上有很多答案,但我因为这个错误浪费了几天时间,我认为分享这个故事可能会有所帮助。
实际上有几种方法可以实现这一点。正如 Glennie 的出色回答所提到的,这很可能是内存问题,因此请确保您有足够的内存来 everything。有container-memory、AM内存、map-memory、reduce-memory等配置需要注意。阅读 this 对找到正确的配置有很大帮助。您应该自己选择数字,但这里有一些我设置的属性。
yarn-site.xml
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>32768</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>4096</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>4096</value>
</property>
mapred-site.xml
<property>
<name>mapreduce.map.memory.mb</name>
<value>4096</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
这些可以修复您可能 运行 遇到的一些其他错误,例如 PySpark shell 在启动时崩溃。但就我而言,虽然一些错误消失了(例如 MetadataFetchFailed 错误),但问题仍然存在。确切的错误是:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to
DB-ETA-C/x.x.x.x:34085
在尝试了从 Spark 超时到 YARN shuffle 服务的所有可能的 YARN 和 Spark 属性 之后,我最终意识到在错误日志中失败的容器正在寻找 x.x.x.x
, 本地(内部)IP 而 运行ning netstat -tulpn | grep <PORT NUM>
返回 y.y.y.y:34085 其中 y.y.y.y 是 外部 IP 地址。这根本不是内存问题,只是网络配置问题。
Spark 服务仅绑定到外部接口 ,因为主机名与 /etc/hosts
中的外部 IP 相关联。更新 /etc/hosts
文件后,问题得到解决。
底线:
该错误显然表明某些容器无法到达另一个容器。这通常是由于内存问题导致容器失败,但也可能是网络问题,因此也要注意这些问题,尤其是当您的节点上有多个接口时。
除了上述内存和网络配置问题外,值得注意的是,对于大型表(例如,这里有几个 TB),org.apache.spark.shuffle.FetchFailedException 可能会由于检索随机分区超时而发生。要解决此问题,您可以设置以下内容:
SET spark.reducer.maxReqsInFlight=1; -- Only pull one file at a time to use full network bandwidth.
SET spark.shuffle.io.retryWait=60s; -- Increase the time to wait while retrieving shuffle partitions before retrying. Longer times are necessary for larger files.
SET spark.shuffle.io.maxRetries=10;
如果所有随机播放任务都失败,则可能的原因可能是 netty 的依赖冲突。
从 spark-core 中排除 netty 依赖项对我有用。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
当我 运行 使用 1 GB 数据集的解析代码时,它可以毫无错误地完成。但是,当我一次尝试 25 GB 的数据时,出现以下错误。我试图了解如何避免以下失败。很高兴听到任何建议或想法。
不同的错误,
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094}
集群详细信息:
Yarn: 8 Nodes
Total cores: 64
Memory: 500 GB
Spark Version: 1.5
Spark 提交声明:
spark-submit --master yarn-cluster \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--executor-memory 4g \
--driver-memory 16g \
--num-executors 50 \
--deploy-mode cluster \
--executor-cores 1 \
--class my.parser \
myparser.jar \
-input xxx \
-output xxxx \
堆栈跟踪之一:
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:460)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses.apply(MapOutputTracker.scala:456)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
此错误几乎可以肯定是由执行程序上的内存问题引起的。我可以想出几种方法来解决这些类型的问题。
1) 您可以尝试 运行 使用更多分区(在 dataframe
上执行 repartition
)。当一个或多个分区包含的数据多于内存中容纳的数据时,通常会出现内存问题。
2) 我注意到您没有明确设置 spark.yarn.executor.memoryOverhead
,因此它将默认为 max(386, 0.10* executorMemory)
,在您的情况下为 400MB。这对我来说听起来很低。我会尝试将其增加到 1GB(请注意,如果将 memoryOverhead 增加到 1GB,则需要将 --executor-memory
降低到 3GB)
3) 查看故障节点上的日志文件。您要查找文本 "Killing container"。如果您看到文本 "running beyond physical memory limits",根据我的经验,增加 memoryOverhead 将解决问题。
通过将 Spark 超时 spark.network.timeout
增加到更大的值(例如 800),我也取得了一些不错的效果。默认的 120 秒会导致您的许多执行程序在负载过重时超时。
好吧,这是一个旧话题,Whosebug 上有很多答案,但我因为这个错误浪费了几天时间,我认为分享这个故事可能会有所帮助。
实际上有几种方法可以实现这一点。正如 Glennie 的出色回答所提到的,这很可能是内存问题,因此请确保您有足够的内存来 everything。有container-memory、AM内存、map-memory、reduce-memory等配置需要注意。阅读 this 对找到正确的配置有很大帮助。您应该自己选择数字,但这里有一些我设置的属性。
yarn-site.xml
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>32768</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>4096</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>4096</value>
</property>
mapred-site.xml
<property>
<name>mapreduce.map.memory.mb</name>
<value>4096</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
这些可以修复您可能 运行 遇到的一些其他错误,例如 PySpark shell 在启动时崩溃。但就我而言,虽然一些错误消失了(例如 MetadataFetchFailed 错误),但问题仍然存在。确切的错误是:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to DB-ETA-C/x.x.x.x:34085
在尝试了从 Spark 超时到 YARN shuffle 服务的所有可能的 YARN 和 Spark 属性 之后,我最终意识到在错误日志中失败的容器正在寻找 x.x.x.x
, 本地(内部)IP 而 运行ning netstat -tulpn | grep <PORT NUM>
返回 y.y.y.y:34085 其中 y.y.y.y 是 外部 IP 地址。这根本不是内存问题,只是网络配置问题。
Spark 服务仅绑定到外部接口 ,因为主机名与 /etc/hosts
中的外部 IP 相关联。更新 /etc/hosts
文件后,问题得到解决。
底线: 该错误显然表明某些容器无法到达另一个容器。这通常是由于内存问题导致容器失败,但也可能是网络问题,因此也要注意这些问题,尤其是当您的节点上有多个接口时。
除了上述内存和网络配置问题外,值得注意的是,对于大型表(例如,这里有几个 TB),org.apache.spark.shuffle.FetchFailedException 可能会由于检索随机分区超时而发生。要解决此问题,您可以设置以下内容:
SET spark.reducer.maxReqsInFlight=1; -- Only pull one file at a time to use full network bandwidth.
SET spark.shuffle.io.retryWait=60s; -- Increase the time to wait while retrieving shuffle partitions before retrying. Longer times are necessary for larger files.
SET spark.shuffle.io.maxRetries=10;
如果所有随机播放任务都失败,则可能的原因可能是 netty 的依赖冲突。 从 spark-core 中排除 netty 依赖项对我有用。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>