集群规模大时 Spark 作业失败,规模小时成功

Spark job fails when cluster size is large, succeeds when small

我有一个 spark 作业,它接受三个输入并执行两个外部联接。数据采用键值格式(String、Array[String])。代码最重要的部分是:

val partitioner = new HashPartitioner(8000)
val joined = inputRdd1.fullOuterJoin(inputRdd2.fullOuterJoin(inputRdd3, partitioner), partitioner).cache
saveAsSequenceFile(joined, filter="X")
saveAsSequenceFile(joined, filter="Y")

我是 运行 EMR 上的工作,具有 r3.4xlarge 驱动程序节点和 500 个 m3.xlarge 工作程序节点。火花提交参数是:

spark-submit --deploy-mode client --master yarn-client --executor-memory 3g --driver-memory 100g --executor-cores 3 --num-executors 4000 --conf spark.default.parallelism=8000 --conf spark.storage.memoryFraction=0.1 --conf spark.shuffle.memoryFraction=0.2  --conf spark.yarn.executor.memoryOverhead=4000 --conf spark.network.timeout=600s

更新:使用此设置,在 spark 作业中看到的执行程序数量 UI 为 500(每个节点一个)

我在驱动程序日志中看到的异常如下:

17/10/13 21:37:57 WARN HeartbeatReceiver: Removing executor 470 with no recent heartbeats: 616136 ms exceeds timeout 600000 ms
17/10/13 21:39:04 ERROR ContextCleaner: Error cleaning broadcast 5
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
...

我尝试过的一些失败的事情:

令人惊讶的是,当我将工作节点的数量减少到 300 个时,作业运行文件。有没有人对为什么会发生这种情况有任何其他假设?

好吧,理解 Spark 的分配是如何工作的有点问题。

根据您的信息,您有 500 个节点,每个节点有 4 个核心。所以,你有 4000 个核心。你对你的请求所做的是创建 4000 个执行器,每个执行器有 3 个核心。这意味着您正在为您的集群请求 12000 个内核,但没有那样的东西。

这个RPC超时的错误与你在同一台机器上启动了多少个jvm有规律地相关联,而这台机器由于同时发生了很多事情而无法及时响应。

你需要知道,--num-executors最好与你的节点相关联,核心数应该与你在每个节点中拥有的核心相关联。

例如,m3.xLarge 的配置是 4 个内核和 15 Gb 的 RAM。 运行 最好的配置是什么?这取决于你打算做什么。看看你是否打算 运行 只做一份工作 我建议你这样设置:

spark-submit --deploy-mode client --master yarn-client --executor-memory 10g --executor-cores 4 --num-executors 500 --conf spark.default.parallelism=2000 --conf spark.yarn.executor.memoryOverhead=4000

这将使你的工作 运行 很好,如果你没有问题将你的数据适合你的工人最好将 default.parallelism 更改为 2000 否则你将失去很多随机播放的时间。

但是,我认为您可以做的最好的方法是保留 EMR 默认启用的动态分配,只需设置核心数、并行度和内存,您的工作就会 运行魅力。

我尝试了很多配置,一次修改一个参数,有 500 个节点。通过将 HashPartitioner 中的分区数从 8000 减少到 3000,我终于得到了工作。

val partitioner = new HashPartitioner(3000)

因此,当有更多分区时,驱动程序可能会被大量必须完成的随机播放所淹没,因此较低的分区会有所帮助。