为什么 EMR 上的 Yarn 不将所有节点分配给 运行 Spark 作业?

Why does Yarn on EMR not allocate all nodes to running Spark jobs?

我 运行 在 Amazon Elastic Map Reduce (EMR) 上的 Apache Spark 工作。目前我 运行 使用 emr-4.1.0,其中包括 Amazon Hadoop 2.6.0 和 Spark 1.5.0。

当我开始作业时,YARN 已正确地将所有工作节点分配给 spark 作业(当然有一个用于驱动程序)。

我把魔法"maximizeResourceAllocation"属性设为"true",火花属性"spark.dynamicAllocation.enabled"也设为"true"。

但是,如果我通过将节点添加到工作机器的 CORE 池来调整 emr 集群的大小,YARN 只会将 一些 新节点添加到 spark 作业。

例如,今天早上我有一份工作使用 26 个节点(m3.2xlarge,如果重要的话)- 1 个用于驱动程序,25 个执行程序。我想加快工作速度,所以我尝试再添加 8 个节点。 YARN 已拾取所有新节点,但仅将其中的 1 个分配给 Spark 作业。 Spark 确实成功地获取了新节点并将其用作执行器,但我的问题是为什么 YARN 让其他 7 个节点闲置?

这很烦人,原因很明显 - 我必须为这些资源付费,即使它们没有被使用,而且我的工作速度一点也没有加快!

有人知道 YARN 如何决定何时向 运行 spark 作业添加节点吗?哪些变量起作用?记忆? V 核?有什么吗?

提前致谢!

好的,在 @sean_r_owen 的帮助下,我能够找到它。

问题是这样的:当将 spark.dynamicAllocation.enabled 设置为 true 时,不应设置 spark.executor.instances - 一个明确的值将覆盖动态分配并将其关闭。原来自己不设置的话,EMR是在后台设置的。要获得所需的行为,您需要将 spark.executor.instances 显式设置为 0.

作为记录,这是我们在创建 EMR 集群时传递给 --configurations 标志的文件之一的内容:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },

    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },

    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.executor.instances": "0"
        }
    } 
]

这为我们提供了一个 EMR 集群,其中 Spark 在 运行 个作业时使用所有节点,包括添加的节点。它还似乎使用了 all/most 内存和所有(?)内核。

(我不完全确定它使用了所有实际内核;但它肯定使用了超过 1 个 VCore,以前不是这样,但是按照 Glennie Helles 的建议,它现在表现更好并且使用了一半列出的 VCores,这似乎等于实际的核心数...)

我使用 emr-5.20.0 在几乎相同的设置下观察到相同的行为。当集群已经 运行 但使用 TASK 节点(连同一个 CORE 节点)时,我没有尝试添加节点。我正在使用 InstanceFleets 来定义 MASTER、CORE 和 TASK 节点(使用 InstanceFleets 我不知道我得到了哪个确切的 InstanceTypes,这就是为什么我不想自己定义每个执行者的执行者、核心和内存的数量,但想要自动成为 maximized/optimized)。

有了这个,它只使用两个 TASK 节点(可能是前两个准备好使用的节点?)但在配置更多 TASK 节点并完成 bootstrap 阶段时永远不会扩展。

它在我的案例中起作用的原因是设置 spark.default.parallelism 参数(我的 TASK 节点的核心总数),它与用于 TargetOnDemandCapacity 或 TargetSpotCapacity 的数字相同任务实例舰队:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },
    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.default.parallelism", <Sum_of_Cores_of_all_TASK_nodes>
        }
    } 
]

为了完整起见:我使用一个CORE节点和几个TASK节点主要是为了确保集群至少有3个节点(1个MASTER,1个CORE和至少一个TASK节点)。在我尝试仅使用 CORE 节点之前,但在我的情况下,核心数量是根据实际任务计算的,最终可能会得到一个仅由一个 MASTER 节点和一个 CORE 节点组成的集群。使用 maximizeResourceAllocation 选项这样的集群永远运行什么都不做,因为执行者 运行 纱线应用程序主机完全占用了那个单一的 CORE 节点。