优化 Spark 资源以避免内存和 space 使用
Optimizing Spark resources to avoid memory and space usage
我有一个大约 190GB 的数据集,它被分成 1000 个分区。
我的 EMR 集群最多允许 10 个 r5a.2xlarge
TASK 节点和 2 个 CORE 节点。每个节点有 64GB 内存和 128GB EBS 存储。
在我的 spark 作业执行中,我将其设置为使用 executor-cores 5
、driver cores 5
、executor-memory 40g
、driver-memory 50g
、spark.yarn.executor.memoryOverhead=10g
、spark.sql.shuffle.partitions=500
, spark.dynamicAllocation.enabled=true
但是我的工作总是失败,出现类似
的错误
spark.shuffle.MetadataFetchFailedException
spark.shuffle.FetchFailedException
java.io.IOException: No space left on device
Container Lost
etc...
网上找的很多这类问题的答案都是说增加memoryOverhead。我做到了,从 2G 到 10G。我的 executor memory
和 memoryOverhead
一共是 50G。其中 40G 分配给执行程序,10G 分配给开销。但我认为我已经达到了极限,因为我将无法超过 56。
我认为我已尽一切可能优化我的 spark 工作:
- 增加分区
- 增加spark.sql.shuffle.partitions
- 增加执行器和开销内存
但是我的工作还是失败了。还有什么我可以尝试的吗?我是否应该进一步增加开销,以便我的执行程序 memory/overhead 内存为 50/50?
Ganglia 中我的工作的内存配置文件如下所示:
(陡峭的下降是当集群刷新所有执行节点时因为它们已经死了)
任何见解将不胜感激
谢谢
编辑:[解决方案]
我在我的 post 中附加了解决我问题的确切解决方案,这要感谢 Debuggerrr
根据他在他的回答中提出的建议。
- 我有一个很大的数据框,在做了很多次之后我正在重新使用它
在其他数据帧上的计算。通过使用
persist()
方法
(由 Debuggerrr 建议),我能够将其保存到 MEMORY 和 DISC
并简单地调用它而不用它的一部分被清理
GC.
- 我也按照他的回答中提到的最佳实践博客 Debuggerrr 计算了正确的执行程序内存、执行程序数量等。但是我没有做的是禁用
spark.dynamicAllocation.enabled
。该博客指出,如果我们手动计算资源,最好将 属性 设置为 false,因为如果您的计算不符合它,spark 往往会错误分配资源。一旦我将它设置为 false,并设置正确的 executor 和 spark 属性,它就像一个魅力!
[编辑 2]:
专门为我的工作工作的参数是:
--executor-cores 5 --driver-cores 5 --executor-memory 44g --driver-memory 44g --num-executors 9 --conf spark.default.parallelism=100 --conf spark.sql.shuffle.partitions=300 --conf spark.yarn.executor.memoryOverhead=11g --conf spark.shuffle.io.retryWait=180s --conf spark.network.timeout=800s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.dynamicAllocation.enabled=false
您可以尝试以下任一步骤:
Memory overhead
应该是Executor内存的10%
或者328 MB
。不要将其增加到任何值。
- 移除驱动核心。
- 如果有10个节点,则指定
number of executors
。您必须以这样一种方式计算它,即为 YARN 和后台进程留下一些 space。另外,您可以尝试增加 1 或 2 个内核。
- 运行 它在
cluster
模式下,无论您分配给执行程序的编号是多少,都为其添加 +1,因为在集群模式下 1 个执行程序将被视为驱动程序执行程序。
- 此外,最后一件事就是您编写的用于提交/处理 190GB 文件的代码。仔细检查您的代码并找到优化它的方法。寻找收集方法,或不必要地使用连接、合并/重新分区。如果不需要,找一些替代品。
- 对您在代码中经常使用的数据帧使用持久化(仅限内存和磁盘)选项。
- 另外,我尝试的最后一件事是在
spark-shell on EMR
上手动执行这些步骤,您将知道代码的哪一部分花费了很多时间 运行。
您也可以参考此 official blog 了解一些提示。
我有一个大约 190GB 的数据集,它被分成 1000 个分区。
我的 EMR 集群最多允许 10 个 r5a.2xlarge
TASK 节点和 2 个 CORE 节点。每个节点有 64GB 内存和 128GB EBS 存储。
在我的 spark 作业执行中,我将其设置为使用 executor-cores 5
、driver cores 5
、executor-memory 40g
、driver-memory 50g
、spark.yarn.executor.memoryOverhead=10g
、spark.sql.shuffle.partitions=500
, spark.dynamicAllocation.enabled=true
但是我的工作总是失败,出现类似
的错误spark.shuffle.MetadataFetchFailedException
spark.shuffle.FetchFailedException
java.io.IOException: No space left on device
Container Lost
etc...
网上找的很多这类问题的答案都是说增加memoryOverhead。我做到了,从 2G 到 10G。我的 executor memory
和 memoryOverhead
一共是 50G。其中 40G 分配给执行程序,10G 分配给开销。但我认为我已经达到了极限,因为我将无法超过 56。
我认为我已尽一切可能优化我的 spark 工作:
- 增加分区
- 增加spark.sql.shuffle.partitions
- 增加执行器和开销内存
但是我的工作还是失败了。还有什么我可以尝试的吗?我是否应该进一步增加开销,以便我的执行程序 memory/overhead 内存为 50/50? Ganglia 中我的工作的内存配置文件如下所示:
(陡峭的下降是当集群刷新所有执行节点时因为它们已经死了)
任何见解将不胜感激
谢谢
编辑:[解决方案]
我在我的 post 中附加了解决我问题的确切解决方案,这要感谢 Debuggerrr
根据他在他的回答中提出的建议。
- 我有一个很大的数据框,在做了很多次之后我正在重新使用它
在其他数据帧上的计算。通过使用
persist()
方法 (由 Debuggerrr 建议),我能够将其保存到 MEMORY 和 DISC 并简单地调用它而不用它的一部分被清理 GC. - 我也按照他的回答中提到的最佳实践博客 Debuggerrr 计算了正确的执行程序内存、执行程序数量等。但是我没有做的是禁用
spark.dynamicAllocation.enabled
。该博客指出,如果我们手动计算资源,最好将 属性 设置为 false,因为如果您的计算不符合它,spark 往往会错误分配资源。一旦我将它设置为 false,并设置正确的 executor 和 spark 属性,它就像一个魅力!
[编辑 2]: 专门为我的工作工作的参数是:
--executor-cores 5 --driver-cores 5 --executor-memory 44g --driver-memory 44g --num-executors 9 --conf spark.default.parallelism=100 --conf spark.sql.shuffle.partitions=300 --conf spark.yarn.executor.memoryOverhead=11g --conf spark.shuffle.io.retryWait=180s --conf spark.network.timeout=800s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.dynamicAllocation.enabled=false
您可以尝试以下任一步骤:
Memory overhead
应该是Executor内存的10%
或者328 MB
。不要将其增加到任何值。- 移除驱动核心。
- 如果有10个节点,则指定
number of executors
。您必须以这样一种方式计算它,即为 YARN 和后台进程留下一些 space。另外,您可以尝试增加 1 或 2 个内核。 - 运行 它在
cluster
模式下,无论您分配给执行程序的编号是多少,都为其添加 +1,因为在集群模式下 1 个执行程序将被视为驱动程序执行程序。 - 此外,最后一件事就是您编写的用于提交/处理 190GB 文件的代码。仔细检查您的代码并找到优化它的方法。寻找收集方法,或不必要地使用连接、合并/重新分区。如果不需要,找一些替代品。
- 对您在代码中经常使用的数据帧使用持久化(仅限内存和磁盘)选项。
- 另外,我尝试的最后一件事是在
spark-shell on EMR
上手动执行这些步骤,您将知道代码的哪一部分花费了很多时间 运行。
您也可以参考此 official blog 了解一些提示。