Spark Standalone 集群中的 workers、executors、cores 是什么?

What are workers, executors, cores in Spark Standalone cluster?

我读了Cluster Mode Overview,但我仍然无法理解Spark Standalone集群中的不同进程和并行性。

worker是不是JVM进程?我 运行 bin\start-slave.sh 发现它生成了 worker,它实际上是一个 JVM。

根据上述 link,执行程序是为 运行 任务的工作节点上的应用程序启动的进程。一个executor也是一个JVM。

这些是我的问题:

  1. 执行器是每个应用程序。那么工人的作用是什么?它是否与执行者 co-ordinate 并将结果传回给 driver? 还是 driver 直接与执行者对话?如果是这样,那么工人的目的是什么?

  2. 如何控制一个应用的executor数量?

  3. 是否可以在执行器中并行执行任务运行?如果可以,如何配置一个executor的线程数?

  4. worker、executor、executor core之间是什么关系(--total-executor-cores)?

  5. 每个节点有更多的worker是什么意思?

已更新

让我们举个例子来更好地理解。

示例 1: 具有 5 个工作节点的独立集群(每个节点有 8 个核心) 当我使用默认设置启动应用程序时。

示例 2 与示例 1 相同的集群配置,但我 运行 具有以下设置的应用程序 --executor-cores 10 --total-executor-cores 10.

示例 3 与示例 1 相同的集群配置,但我 运行 具有以下设置的应用程序 --executor-cores 10 --total-executor-cores 50.

示例 4 与示例 1 相同的集群配置,但我 运行 具有以下设置的应用程序 --executor-cores 50 --total-executor-cores 50.

示例 5 与示例 1 相同的集群配置,但我 运行 具有以下设置的应用程序 --executor-cores 50 --total-executor-cores 10.

在每个例子中, 多少执行者?每个执行者有多少个线程?几个核心? 每个应用程序的执行者数量是如何决定的?它总是与工人的数量相同吗?

Spark 使用 master/slave 架构。如图所示,它有一个中央协调器(Driver),与许多分布式工作者(执行者)进行通信。 driver 和每个执行者 运行 在他们自己的 Java 进程中。

DRIVER

driver是main方法所在的进程运行s。首先它将用户程序转换为任务,然后将任务调度到执行程序上。

执行者

执行器是工作节点的进程,负责运行给定 Spark 作业中的单个任务。它们在 Spark 应用程序开始时启动,通常 运行 持续应用程序的整个生命周期。一旦他们有了 运行 任务,他们就会将结果发送到 driver。它们还为用户程序通过块管理器缓存的 RDD 提供 in-memory 存储。

应用程序执行流程

考虑到这一点,当您使用 spark-submit 向集群提交应用程序时,内部会发生以下情况:

  1. 一个独立的应用程序启动并实例化一个 SparkContext 实例(只有在那时您才能调用该应用程序 driver)。
  2. driver 程序向集群管理器请求资源以启动执行程序。
  3. 集群管理器启动执行器。
  4. driver 处理 运行 通过用户应用程序。根据 RDD 上的操作和转换,任务被发送到执行程序。
  5. 执行者 运行 任务并保存结果。
  6. 如果有一个worker挂掉了,它的任务会被发送到不同的executor去重新处理。在书中 "Learning Spark: Lightning-Fast Big Data Analysis" 他们谈到了 Spark 和 Fault Tolerance:

Spark automatically deals with failed or slow machines by re-executing failed or slow tasks. For example, if the node running a partition of a map() operation crashes, Spark will rerun it on another node; and even if the node does not crash but is simply much slower than other nodes, Spark can preemptively launch a “speculative” copy of the task on another node, and take its result if that finishes.

  1. 使用 driver 中的 SparkContext.stop() 或如果主方法 exits/crashes 所有执行程序将被终止,集群资源将由集群管理器释放。

你的问题

  1. 当执行者启动时,它们会在 driver 中注册自己,然后它们会直接通信。工作人员负责向集群管理器传达其资源的可用性。

  2. 在 YARN 集群中,您可以使用 --num-executors 来做到这一点。在独立集群中,除非您使用 spark.executor.cores 并且一个工作人员有足够的内核来容纳多个执行程序,否则每个工作人员将获得一个执行程序。 (正如@JacekLaskowski 指出的那样,--num-executors 不再在 YARN https://github.com/apache/spark/commit/16b6d18613e150c7038c613992d80a7828413e66 中使用)

  3. 您可以使用 --executor-cores

  4. 分配每个执行程序的核心数
  5. --total-executor-cores 是每个应用程序的最大执行器内核数

  6. 正如 Sean Owen 在这篇 thread 中所说:"there's not a good reason to run more than one worker per machine"。例如,您会在一台机器上安装多个 JVM。

更新

我无法测试这种情况,但根据文档:

示例 1: Spark 将贪婪地获取调度程序提供的尽可能多的核心和执行程序。所以最后你会得到 5 个执行器,每个执行器有 8 个核心。

示例 2 到 5: Spark 无法在单个 worker 中分配尽可能多的内核,因此不会启动任何执行程序。

这是 Apache Spark 内部工作的方式: