multiprocessing.Pool 应使用多少个处理器?

How many processors should be used with multiprocessing.Pool?

我正在尝试并行使用 multiprocessing.Pool 到 运行 我的代码。要实例化 Pool,您必须设置进程数。我想弄清楚我应该为此设置多少。我知道这个数字不应该超过你拥有的核心数量,但我已经看到了不同的方法来确定你的系统有哪些可用。

2 种方法:

  1. multiprocessing.cpu_count()
  2. len(os.sched_getaffinity(0))

我有点困惑;两者有什么区别,哪个应该用Pool来实现?我在一个远程集群上工作,第一个,它输出有 128 cpu,但第二个给出 10.

两者的区别在doc中有明确说明:

multiprocessing.cpu_count() Return the number of CPUs in the system.

This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with len(os.sched_getaffinity(0)).

因此,即使您使用的是 128 核系统,您的程序也可能以某种方式仅限于 运行 在 128 个可用 CPU 中的一组特定的 10 个上。由于亲缘关系也适用于子线程和进程,因此生成超过 10 个没有多大意义。但是,您可以在启动池之前尝试通过 os.sched_setaffinity() 增加可用 CPU 的数量。

import os
import multiprocessing as mp

cpu_count = mp.cpu_count() 

if len(os.sched_getaffinity(0)) < cpu_count:
    try:
        os.sched_setaffinity(0, range(cpu_count))
    except OSError:
        print('Could not set affinity')

n = max(len(os.sched_getaffinity(0)), 96)
print('Using', n, 'processes for the pool')

pool = mp.Pool(n)
# ...

另见 man 2 sched_setaffinity

首先,请记住 cpu_count() returns virtual CPU 的数量(这可以大于物理 CPU 的数量,以防每个 CPU 支持多线程。要查看物理 CPU 的数量,请使用:

psutil.cpu_count(logical=False)

无论如何,使用 psutil.cpu_count() 你会得到虚拟 CPU 的实际数量,这也是你的系统上可以拥有的最大可能并发线程数。

os.sched_getaffinity(0) # same as the default os.sched_getaffinity()

(其中 0 是当前进程)你得到 CPU 可用于 当前 进程的数量。您可以通过以下方式更改它:

os.sched_setaffinity(0,[1,2,3])

例如,您告诉进程使用 3 个 CPUs,即:123.

请注意,如果您将 Pool 设置为使用最大可用数量的 CPUs,您将无法获得最大并行度,因为某些 CPUs 将始终忙于操作系统.同样,在 multi-user 环境中,您可能无法达到池中线程数设置的并行度。

像 SLURM 或 YARN 这样的调度引擎可以保证进程获得一定数量的 CPUs,从而获得所需的并行度。

只是一些澄清:

如果您提交的任务是 100% CPU,即没有 I/O 或涉及网络等待,那么不仅池中的进程数不应超过核心数 可用,没有理由超过您将在任何时候提交的任务数。例如,如果您将 multiprocessing.pool.Pool.map 方法与包含 N 个元素的 iterable 一起使用,则池的大小应为 min(N, available_cores),其中可用核心数将由 len(os.sched_getaffinity(0)) 给出(参见 Marco Bonelli 给出的答案,但请注意 os.sched.getaffinity 方法未在 Windows 上实现)。

但是,如果除了 CPU-intensive 处理之外,您的工作函数还涉及 I/O and/or 网络等待,那么拥有一个更大的处理池肯定会让您受益比您可用的内核数量要多,因为进程将在处理的各个点处于等待状态。根据 CPU 和 I/O 的组合以及逻辑的排列方式,最佳解决方案可能是拥有一个多线程池 以及多处理池(其大小基于可用内核和任务数量),其中工作函数与多线程一起使用,但传递给多线程池,submit 调用多线程池进行 CPU-intensive 计算。

更新

下面的程序展示了四种方法,通过目录向下读取每个以 .py 结尾的文件,并从检索到的文本中计算一些值,并构建这些值的列表。处理已整齐地分为 I/O 处理和 CPU 处理功能。

如果您调用函数serial,那么所有处理都将连续进行,没有多处理。如果您调用函数 do_multiprocessing,那么将单独使用多处理来完成使用默认池大小的工作。但这也意味着所有文件读取也将基于该池大小进行并行化(或者我应该说“尝试并行化”?),这可能不太理想,特别是如果您没有固态驾驶。如果您调用函数 multithreading_and_multiprocessing,那么您选择的多线程池大小将用于执行文件读取处理和多处理池用于 CPU-intensive 计算。最后是 multiprocessing_cpu_only 版本,调用时仅使用多处理池进行 CPU-intensive 处理,主进程遍历目录,将所有任务提交到池中。如果多线程池大小为 1,则这在某种程度上等同于混合多线程池和多处理池示例,只是效率更高,因为它没有首先将任务提交到多线程队列的额外层。

from multiprocessing.pool import ThreadPool, Pool
from pathlib import Path
import time
from functools import partial


def cpu_processing(text, increment):
    total = len(text)
    for _ in range(2_000_000):
        total += increment
    return total

def serial():
    """
    Call serial() for serial processing.
    """

    t = time.time_ns()
    results = []
    for path in Path('.').glob('**/*.py'):
        text = path.read_text(encoding='utf-8')
        results.append(cpu_processing(text, 1))
    print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')

def process_path(path):
    text = path.read_text(encoding='utf-8')
    return cpu_processing(text, 1)

def do_multiprocessing():
    """
    Call do_multiprocessing for doing all processing with just
    a multiprocessing pool.
    """
    t = time.time_ns()
    mp_pool = Pool()
    results = mp_pool.map(process_path, Path('.').glob('**/*.py'))
    mp_pool.close()
    mp_pool.join()
    print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')

def io_processing_parallel(mp_pool, path):
    text = path.read_text(encoding='utf-8')
    # Returns an AsyncResult instance:
    return mp_pool.apply_async(cpu_processing, args=(text, 1))

def multithreading_and_multiprocessing():
    """
    Call multithreading_and_multiprocessing to use a combination of
    multithreading and multiprocessing to have finer control over I/O concurrency.
    """
    t = time.time_ns()
    mp_pool = Pool()
    tp_pool = ThreadPool(2)
    worker = partial(io_processing_parallel, mp_pool)
    results = [async_result.get() for async_result in
               tp_pool.imap(worker, Path('.').glob('**/*.py'))]
    tp_pool.close()
    tp_pool.join()
    mp_pool.close()
    mp_pool.join()
    print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')

def multiprocessing_cpu_only():
    """
    Call multiprocessing_cpu_only to use multiprocessing only for the
    CPU-intensive processing.
    """
    def get_texts():
        for path in Path('.').glob('**/*.py'):
            yield path.read_text(encoding='utf-8')

    t = time.time_ns()
    mp_pool = Pool()
    worker = partial(cpu_processing, increment=1)
    results = list(mp_pool.imap(worker, get_texts()))
    mp_pool.close()
    mp_pool.join()
    print(f'Elapsed time = {(time.time_ns() - t) / 1_000_000_000.0} sec., #results = {len(results)}')

if __name__ == '__main__':
    multithreading_and_multiprocessing()