Spark 内存使用集中在 Driver / Master 上
Spark Memory Usage Concentrated on Driver / Master
我目前正在开发一个 Spark (v 2.2.0) Streaming 应用程序,我正在 运行 关注 Spark 似乎在集群中分配工作的方式。此应用程序使用客户端模式提交到 AWS EMR,因此有一个驱动程序节点和几个工作程序节点。这是 Ganglia 的屏幕截图,显示了过去一小时的内存使用情况:
Ganglia Screenshot
最左边的节点是"master"或"driver"节点,另外两个是worker节点。与通过流传入的工作负载相对应的所有三个节点的内存使用量均出现峰值,但峰值并不相等(即使按内存使用百分比进行缩放)。当大工作量进来时,driver 节点似乎超负荷工作,job 会崩溃并报内存错误:
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x000000053e980000, 674234368, 0) failed; error='Cannot allocate memory' (errno=12)
我也 运行 喜欢这个:
Exception in thread "streaming-job-executor-10" java.lang.OutOfMemoryError: Java heap space
当master运行s内存不足,这同样令人困惑,据我理解是"client"模式不会使用驱动程序/主节点作为执行者。
相关详情:
- 如前所述,本申请是以客户端方式提交的:
spark-submit --deploy-mode client --master yarn ...
.
- 程序中没有我运行宁
collect
或coalesce
- 我怀疑在单个节点上 运行 的任何工作(
jdbc
主要是阅读)在完成后是 repartition
。
- 内存中有几个非常非常小的数据集
persist
。
- 1 x 驱动程序规格:4 核,16GB RAM(m4.xlarge 实例)
- 2 x Worker 规格:4 核,30.5GB RAM(r3.xlarge 实例)
- 我试过允许 Spark 选择执行器大小/内核并手动指定它们。两种情况的行为相同。 (我手动指定了6个executors,1个core,9GB RAM)
我当然在这里不知所措。我不确定代码中发生了什么会触发驱动程序像这样占用工作量。
我唯一能想到的是类似于以下的代码片段:
val scoringAlgorithm = HelperFunctions.scoring(_: Row, batchTime)
val rawScored = dataToScore.map(scoringAlgorithm)
这里,一个函数正在从静态对象加载,并用于映射到 Dataset
。据我了解,Spark 将跨集群序列化此函数(回复:http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#passing-functions-to-spark)。然而,也许我错了,它只是 运行 在驱动程序上进行此转换。
如果有人对这个问题有任何见解,我很想听听!
我最终解决了这个问题。以下是我的处理方式:
我在陈述问题时做出了错误的断言:在Spark程序的开头有一个collect
语句。
我有一个交易需要 collect()
到 运行,因为它是设计的。我的假设是,对结果数据调用 repartition(n)
会将数据拆分回集群中的执行程序。据我所知,这种策略行不通。一旦我重写了这一行,Spark 就开始按照我的预期运行,并将工作分配给工作节点。
我对任何偶然发现这个问题的迷失灵魂的建议:不要 collect
除非它是你的 Spark 程序的结束。你无法从中恢复。寻找另一种方法来执行您的任务。 (我最终将 SQL 事务从 where col in (,,,)
语法切换到数据库上的连接。)
我目前正在开发一个 Spark (v 2.2.0) Streaming 应用程序,我正在 运行 关注 Spark 似乎在集群中分配工作的方式。此应用程序使用客户端模式提交到 AWS EMR,因此有一个驱动程序节点和几个工作程序节点。这是 Ganglia 的屏幕截图,显示了过去一小时的内存使用情况:
Ganglia Screenshot
最左边的节点是"master"或"driver"节点,另外两个是worker节点。与通过流传入的工作负载相对应的所有三个节点的内存使用量均出现峰值,但峰值并不相等(即使按内存使用百分比进行缩放)。当大工作量进来时,driver 节点似乎超负荷工作,job 会崩溃并报内存错误:
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x000000053e980000, 674234368, 0) failed; error='Cannot allocate memory' (errno=12)
我也 运行 喜欢这个:
Exception in thread "streaming-job-executor-10" java.lang.OutOfMemoryError: Java heap space
当master运行s内存不足,这同样令人困惑,据我理解是"client"模式不会使用驱动程序/主节点作为执行者。
相关详情:
- 如前所述,本申请是以客户端方式提交的:
spark-submit --deploy-mode client --master yarn ...
. - 程序中没有我运行宁
collect
或coalesce
- 我怀疑在单个节点上 运行 的任何工作(
jdbc
主要是阅读)在完成后是repartition
。 - 内存中有几个非常非常小的数据集
persist
。 - 1 x 驱动程序规格:4 核,16GB RAM(m4.xlarge 实例)
- 2 x Worker 规格:4 核,30.5GB RAM(r3.xlarge 实例)
- 我试过允许 Spark 选择执行器大小/内核并手动指定它们。两种情况的行为相同。 (我手动指定了6个executors,1个core,9GB RAM)
我当然在这里不知所措。我不确定代码中发生了什么会触发驱动程序像这样占用工作量。
我唯一能想到的是类似于以下的代码片段:
val scoringAlgorithm = HelperFunctions.scoring(_: Row, batchTime)
val rawScored = dataToScore.map(scoringAlgorithm)
这里,一个函数正在从静态对象加载,并用于映射到 Dataset
。据我了解,Spark 将跨集群序列化此函数(回复:http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#passing-functions-to-spark)。然而,也许我错了,它只是 运行 在驱动程序上进行此转换。
如果有人对这个问题有任何见解,我很想听听!
我最终解决了这个问题。以下是我的处理方式:
我在陈述问题时做出了错误的断言:在Spark程序的开头有一个collect
语句。
我有一个交易需要 collect()
到 运行,因为它是设计的。我的假设是,对结果数据调用 repartition(n)
会将数据拆分回集群中的执行程序。据我所知,这种策略行不通。一旦我重写了这一行,Spark 就开始按照我的预期运行,并将工作分配给工作节点。
我对任何偶然发现这个问题的迷失灵魂的建议:不要 collect
除非它是你的 Spark 程序的结束。你无法从中恢复。寻找另一种方法来执行您的任务。 (我最终将 SQL 事务从 where col in (,,,)
语法切换到数据库上的连接。)