Spark如何在多核或超线程机器上的一个任务中实现并行
How does Spark achieve parallelism within one task on multi-core or hyper-threaded machines
我一直在阅读并试图了解 Spark 框架如何在 Standalone 模式下使用其内核。根据 Spark 文档,参数“spark.task.cpus”的值默认设置为 1,这意味着为每个任务分配的核心数。
问题 1:
对于多核机器(例如,总共 4 个核心,8 个硬件线程),当 "spark.task.cpus = 4" 时,Spark 会使用 4 个核心(每个核心 1 个线程)还是 2 个带有超线程的核心?
如果我设置 "spark.task.cpus = 16" 超过本机可用硬件线程数,会发生什么情况?
问题 2:
这种硬件并行性是如何实现的?我试图查看代码,但找不到任何与硬件或 JVM 通信以实现核心级并行性的东西。比如任务是"filter"函数,那么单个filter任务是如何吐到多个核或者线程上的呢?
也许我遗漏了什么。这与 Scala 语言有关吗?
为了回答您的标题问题,Spark 本身并不能使您在任务中获得并行性。 spark.task.cpus
参数的主要目的是允许多线程任务。如果您在每个任务中调用外部多线程例程,或者您希望自己在任务级别封装最佳并行度,您可能需要将 spark.task.cpus
设置为大于 1。
不过,将此参数设置为大于 1 并不是您经常做的事情。
- 如果可用内核数少于任务所需的内核数,调度程序将不会启动任务,因此如果您的执行器有 8 个内核,并且您已将
spark.task.cpus
设置为 3,则仅将启动 2 个任务。
- 如果您的任务没有一直消耗内核的全部容量,您可能会发现使用
spark.task.cpus=1
并在任务中遇到一些争用仍然可以提高性能。
- GC 或 I/O 之类的开销可能不应该包含在
spark.task.cpus
设置中,因为它可能是静态成本,不会随着你的任务计数。
Question 1: For a multi-core machine (e.g., 4 cores in total, 8 hardware threads), when "spark.task.cpus = 4", will Spark use 4 cores (1 thread per core) or 2 cores with hyper-thread?
JVM 几乎总是依赖 OS 来为其提供与 CPU 一起工作的信息和机制,而 AFAIK Spark 在这里没有做任何特别的事情。如果 Runtime.getRuntime().availableProcessors()
或 ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors()
return 4 用于您的 dual-core HT-enabled Intel® 处理器,Spark 也将看到 4 个内核。
Question 2: How is this type of hardware parallelism achieved? I tried to look into the code but couldn't find anything that communicates with the hardware or JVM for core-level parallelism. For example, if the task is "filter" function, how is a single filter task spitted to multiple cores or threads?
如上所述,Spark 不会根据 spark.task.cpus
参数自动并行化任务。 Spark 主要是一个数据并行引擎,它的并行主要是通过将数据表示为 RDD 来实现的。
我一直在阅读并试图了解 Spark 框架如何在 Standalone 模式下使用其内核。根据 Spark 文档,参数“spark.task.cpus”的值默认设置为 1,这意味着为每个任务分配的核心数。
问题 1: 对于多核机器(例如,总共 4 个核心,8 个硬件线程),当 "spark.task.cpus = 4" 时,Spark 会使用 4 个核心(每个核心 1 个线程)还是 2 个带有超线程的核心?
如果我设置 "spark.task.cpus = 16" 超过本机可用硬件线程数,会发生什么情况?
问题 2: 这种硬件并行性是如何实现的?我试图查看代码,但找不到任何与硬件或 JVM 通信以实现核心级并行性的东西。比如任务是"filter"函数,那么单个filter任务是如何吐到多个核或者线程上的呢?
也许我遗漏了什么。这与 Scala 语言有关吗?
为了回答您的标题问题,Spark 本身并不能使您在任务中获得并行性。 spark.task.cpus
参数的主要目的是允许多线程任务。如果您在每个任务中调用外部多线程例程,或者您希望自己在任务级别封装最佳并行度,您可能需要将 spark.task.cpus
设置为大于 1。
不过,将此参数设置为大于 1 并不是您经常做的事情。
- 如果可用内核数少于任务所需的内核数,调度程序将不会启动任务,因此如果您的执行器有 8 个内核,并且您已将
spark.task.cpus
设置为 3,则仅将启动 2 个任务。 - 如果您的任务没有一直消耗内核的全部容量,您可能会发现使用
spark.task.cpus=1
并在任务中遇到一些争用仍然可以提高性能。 - GC 或 I/O 之类的开销可能不应该包含在
spark.task.cpus
设置中,因为它可能是静态成本,不会随着你的任务计数。
- 如果可用内核数少于任务所需的内核数,调度程序将不会启动任务,因此如果您的执行器有 8 个内核,并且您已将
Question 1: For a multi-core machine (e.g., 4 cores in total, 8 hardware threads), when "spark.task.cpus = 4", will Spark use 4 cores (1 thread per core) or 2 cores with hyper-thread?
JVM 几乎总是依赖 OS 来为其提供与 CPU 一起工作的信息和机制,而 AFAIK Spark 在这里没有做任何特别的事情。如果 Runtime.getRuntime().availableProcessors()
或 ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors()
return 4 用于您的 dual-core HT-enabled Intel® 处理器,Spark 也将看到 4 个内核。
Question 2: How is this type of hardware parallelism achieved? I tried to look into the code but couldn't find anything that communicates with the hardware or JVM for core-level parallelism. For example, if the task is "filter" function, how is a single filter task spitted to multiple cores or threads?
如上所述,Spark 不会根据 spark.task.cpus
参数自动并行化任务。 Spark 主要是一个数据并行引擎,它的并行主要是通过将数据表示为 RDD 来实现的。