集群规模大时 Spark 作业失败,规模小时成功
Spark job fails when cluster size is large, succeeds when small
我有一个 spark 作业,它接受三个输入并执行两个外部联接。数据采用键值格式(String、Array[String])。代码最重要的部分是:
val partitioner = new HashPartitioner(8000)
val joined = inputRdd1.fullOuterJoin(inputRdd2.fullOuterJoin(inputRdd3, partitioner), partitioner).cache
saveAsSequenceFile(joined, filter="X")
saveAsSequenceFile(joined, filter="Y")
我是 运行 EMR 上的工作,具有 r3.4xlarge 驱动程序节点和 500 个 m3.xlarge 工作程序节点。火花提交参数是:
spark-submit --deploy-mode client --master yarn-client --executor-memory 3g --driver-memory 100g --executor-cores 3 --num-executors 4000 --conf spark.default.parallelism=8000 --conf spark.storage.memoryFraction=0.1 --conf spark.shuffle.memoryFraction=0.2 --conf spark.yarn.executor.memoryOverhead=4000 --conf spark.network.timeout=600s
更新:使用此设置,在 spark 作业中看到的执行程序数量 UI 为 500(每个节点一个)
我在驱动程序日志中看到的异常如下:
17/10/13 21:37:57 WARN HeartbeatReceiver: Removing executor 470 with no recent heartbeats: 616136 ms exceeds timeout 600000 ms
17/10/13 21:39:04 ERROR ContextCleaner: Error cleaning broadcast 5
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
...
我尝试过的一些失败的事情:
- 我认为问题是因为产生了太多的执行程序,驱动程序有跟踪这些执行程序的开销。我尝试通过将执行程序内存增加到 4g 来减少执行程序的数量。这没有帮助。
- 我尝试将驱动程序的实例类型更改为 r3.8xlarge,这也没有帮助。
令人惊讶的是,当我将工作节点的数量减少到 300 个时,作业运行文件。有没有人对为什么会发生这种情况有任何其他假设?
好吧,理解 Spark 的分配是如何工作的有点问题。
根据您的信息,您有 500 个节点,每个节点有 4 个核心。所以,你有 4000 个核心。你对你的请求所做的是创建 4000 个执行器,每个执行器有 3 个核心。这意味着您正在为您的集群请求 12000 个内核,但没有那样的东西。
这个RPC超时的错误与你在同一台机器上启动了多少个jvm有规律地相关联,而这台机器由于同时发生了很多事情而无法及时响应。
你需要知道,--num-executors
最好与你的节点相关联,核心数应该与你在每个节点中拥有的核心相关联。
例如,m3.xLarge 的配置是 4 个内核和 15 Gb 的 RAM。 运行 最好的配置是什么?这取决于你打算做什么。看看你是否打算 运行 只做一份工作 我建议你这样设置:
spark-submit --deploy-mode client --master yarn-client --executor-memory 10g --executor-cores 4 --num-executors 500 --conf spark.default.parallelism=2000 --conf spark.yarn.executor.memoryOverhead=4000
这将使你的工作 运行 很好,如果你没有问题将你的数据适合你的工人最好将 default.parallelism
更改为 2000 否则你将失去很多随机播放的时间。
但是,我认为您可以做的最好的方法是保留 EMR 默认启用的动态分配,只需设置核心数、并行度和内存,您的工作就会 运行魅力。
我尝试了很多配置,一次修改一个参数,有 500 个节点。通过将 HashPartitioner 中的分区数从 8000 减少到 3000,我终于得到了工作。
val partitioner = new HashPartitioner(3000)
因此,当有更多分区时,驱动程序可能会被大量必须完成的随机播放所淹没,因此较低的分区会有所帮助。
我有一个 spark 作业,它接受三个输入并执行两个外部联接。数据采用键值格式(String、Array[String])。代码最重要的部分是:
val partitioner = new HashPartitioner(8000)
val joined = inputRdd1.fullOuterJoin(inputRdd2.fullOuterJoin(inputRdd3, partitioner), partitioner).cache
saveAsSequenceFile(joined, filter="X")
saveAsSequenceFile(joined, filter="Y")
我是 运行 EMR 上的工作,具有 r3.4xlarge 驱动程序节点和 500 个 m3.xlarge 工作程序节点。火花提交参数是:
spark-submit --deploy-mode client --master yarn-client --executor-memory 3g --driver-memory 100g --executor-cores 3 --num-executors 4000 --conf spark.default.parallelism=8000 --conf spark.storage.memoryFraction=0.1 --conf spark.shuffle.memoryFraction=0.2 --conf spark.yarn.executor.memoryOverhead=4000 --conf spark.network.timeout=600s
更新:使用此设置,在 spark 作业中看到的执行程序数量 UI 为 500(每个节点一个)
我在驱动程序日志中看到的异常如下:
17/10/13 21:37:57 WARN HeartbeatReceiver: Removing executor 470 with no recent heartbeats: 616136 ms exceeds timeout 600000 ms
17/10/13 21:39:04 ERROR ContextCleaner: Error cleaning broadcast 5
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
...
我尝试过的一些失败的事情:
- 我认为问题是因为产生了太多的执行程序,驱动程序有跟踪这些执行程序的开销。我尝试通过将执行程序内存增加到 4g 来减少执行程序的数量。这没有帮助。
- 我尝试将驱动程序的实例类型更改为 r3.8xlarge,这也没有帮助。
令人惊讶的是,当我将工作节点的数量减少到 300 个时,作业运行文件。有没有人对为什么会发生这种情况有任何其他假设?
好吧,理解 Spark 的分配是如何工作的有点问题。
根据您的信息,您有 500 个节点,每个节点有 4 个核心。所以,你有 4000 个核心。你对你的请求所做的是创建 4000 个执行器,每个执行器有 3 个核心。这意味着您正在为您的集群请求 12000 个内核,但没有那样的东西。
这个RPC超时的错误与你在同一台机器上启动了多少个jvm有规律地相关联,而这台机器由于同时发生了很多事情而无法及时响应。
你需要知道,--num-executors
最好与你的节点相关联,核心数应该与你在每个节点中拥有的核心相关联。
例如,m3.xLarge 的配置是 4 个内核和 15 Gb 的 RAM。 运行 最好的配置是什么?这取决于你打算做什么。看看你是否打算 运行 只做一份工作 我建议你这样设置:
spark-submit --deploy-mode client --master yarn-client --executor-memory 10g --executor-cores 4 --num-executors 500 --conf spark.default.parallelism=2000 --conf spark.yarn.executor.memoryOverhead=4000
这将使你的工作 运行 很好,如果你没有问题将你的数据适合你的工人最好将 default.parallelism
更改为 2000 否则你将失去很多随机播放的时间。
但是,我认为您可以做的最好的方法是保留 EMR 默认启用的动态分配,只需设置核心数、并行度和内存,您的工作就会 运行魅力。
我尝试了很多配置,一次修改一个参数,有 500 个节点。通过将 HashPartitioner 中的分区数从 8000 减少到 3000,我终于得到了工作。
val partitioner = new HashPartitioner(3000)
因此,当有更多分区时,驱动程序可能会被大量必须完成的随机播放所淹没,因此较低的分区会有所帮助。