os.sched_getaffinity(0) 对比 os.cpu_count()

os.sched_getaffinity(0) vs os.cpu_count()

所以,我知道标题中两种方法的区别,但不知道实际含义。

据我了解:如果您使用的 NUM_WORKERS 多于实际可用的内核,您将面临性能大幅下降,因为您的 OS 不断来回切换以保持并行。不知道这是多么真实,但我是在某处从比我聪明的人那里读到的。

os.cpu_count() 的文档中说:

Return the number of CPUs in the system. Returns None if undetermined. This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with len(os.sched_getaffinity(0))

所以,如果一个进程可以使用比“系统”更多的 CPU,我正在尝试弄清楚“系统”指的是什么。

我只想安全高效地实现 multiprocessing.pool 功能。所以这是我的问题总结:

有什么实际意义:

NUM_WORKERS = os.cpu_count() - 1
# vs.
NUM_WORKERS = len(os.sched_getaffinity(0)) - 1

-1 是因为我发现如果我在处理数据时尝试工作,我的系统会少很多延迟。

multiprocessing.pool的实现使用了

        if processes is None:
            processes = os.cpu_count() or 1

不确定这是否回答了您的问题,但至少它是一个数据点。

这两个函数非常不同,NUM_WORKERS = os.sched_getaffinity(0) - 1 会立即失败并返回 TypeError,因为您试图从集合中减去一个整数。虽然 os.cpu_count() 告诉您系统有多少个核心,但 os.sched_getaffinity(pid) 告诉您在哪些核心上 thread/process 是 allowed 到 运行 .


os.cpu_count()

os.cpu_count() 显示 OS(虚拟 内核)已知的可用内核数。您很可能拥有这个数量的一半 physical 内核。如果使用比您拥有的物理内核更多的进程,甚至比虚拟内核更多的进程是有意义的,这在很大程度上取决于您在做什么。计算循环越紧密(指令的多样性很小,缓存未命中的情况很少,...),您就越有可能无法从更多使用的内核中获益(通过使用更多 worker-processes),甚至性能下降。

显然,这还取决于您的系统还 运行ning,因为您的系统试图为系统中的每个线程(作为进程的实际执行单元)公平分配 run-time 在可用内核上。因此,就您 应该 使用多少工人而言,无法一概而论。但是,例如,如果你有一个紧密的循环并且你的系统处于空闲状态,那么优化的一个很好的起点是

os.cpu_count() // 2 # same as mp.cpu_count() // 2 

...并从那里增加。

@Frank Yellin 已经提到,multiprocessing.Pool 默认使用 os.cpu_count() 作为工人数量。

os.sched_getaffinity(pid)

os.sched_getaffinity(pid)

Return the set of CPUs the process with PID pid (or the current process if zero) is restricted to.

现在 core/cpu/processor/-affinity 是关于您的线程(在您的 worker-process 内)允许 到 运行 的具体(虚拟)核心。您的 OS 为每个核心提供一个 id,从 0 到 (number-of-cores - 1) 并且更改亲和力允许限制(“固定”)某个线程允许在哪些实际核心上使用 运行 完全没有。

至少在 Linux 上,我发现这意味着如果 none 允许的内核当前可用,则 child-process 的线程不会 运行 ,即使其他 non-allowed 个核心处于空闲状态。所以“亲和力”在这里有点误导。

摆弄亲和力的目标是尽量减少 context-switches 和 core-migrations 的缓存失效。您在这里的 OS 通常具有更好的洞察力,并且已经尝试通过其 scheduling-policy 保持缓存“热”,因此除非您知道自己在做什么,否则您不能指望从干扰中轻松获益。

默认情况下,affinity 设置为所有内核,对于 multiprocessing.Pool,更改它并没有太大意义,至少如果您的系统空闲的话。

请注意,尽管此处的文档提到“过程”,但设置亲和力确实是一件 per-thread 的事情。因此,例如,在“当前进程如果为零”的“子”线程中设置亲和力,不会更改 main-thread 或进程中其他线程的亲和力。 但是、child-threads从main-thread和child-process继承它们的亲和力(通过它们的main-thread)从父进程继承亲和力main-thread。这会影响所有可能的 start-methods(“spawn”、“fork”、“forkserver”)。下面的示例演示了这一点以及如何使用 multiprocessing.Pool.

修改亲和力
import multiprocessing as mp
import threading
import os


def _location():
    return f"{mp.current_process().name} {threading.current_thread().name}"


def thread_foo():
    print(f"{_location()}, affinity before change: {os.sched_getaffinity(0)}")
    os.sched_setaffinity(0, {4})
    print(f"{_location()}, affinity after change: {os.sched_getaffinity(0)}")


def foo(_, iterations=200e6):

    print(f"{_location()}, affinity before thread_foo:"
          f" {os.sched_getaffinity(0)}")

    for _ in range(int(iterations)):  # some dummy computation
        pass

    t = threading.Thread(target=thread_foo)
    t.start()
    t.join()

    print(f"{_location()}, affinity before exit is unchanged: "
          f"{os.sched_getaffinity(0)}")

    return _


if __name__ == '__main__':

    mp.set_start_method("spawn")  # alternatives on Unix: "fork", "forkserver"

    # for current process, exclude cores 0,1 from affinity-mask
    print(f"parent affinity before change: {os.sched_getaffinity(0)}")
    excluded_cores = {0, 1}
    os.sched_setaffinity(0, os.sched_getaffinity(0).difference(excluded_cores))
    print(f"parent affinity after change: {os.sched_getaffinity(0)}")

    with mp.Pool(2) as pool:
        pool.map(foo, range(5))

输出:

parent affinity before change: {0, 1, 2, 3, 4, 5, 6, 7}
parent affinity after change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-1, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-1, affinity after change: {4}
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-1, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-1, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-2, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-2, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-2, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-2, affinity after change: {4}
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-3, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-3, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}

如果您有一个纯 100% CPU 绑定的任务,即除了计算什么都不做,那么显然 would/could 进程池大小大于 CPUs 在您的计算机上可用。但是,如果混合使用 I/O 进程将放弃 CPU 等待 I/O 完成(或者,例如 URL从网站返回,需要相对 的时间)?对我而言,您不清楚在这种情况下您是否无法通过超过 os.cpu_count().

的进程池大小来提高吞吐量

更新

这是演示这一点的代码。这段代码可能最好通过使用线程来提供,它正在使用进程。我的桌面上有 8 个内核。该程序只是同时检索 54 URL(或在本例中为并行检索)。该程序传递了一个参数,即要使用的池的大小。不幸的是,创建额外的进程会产生初始开销,因此如果创建太多进程,节省的成本就会开始下降。但是如果任务很长 运行 并且有很多 I/O,那么创建进程的开销最终是值得的:

from concurrent.futures import ProcessPoolExecutor, as_completed
import requests
from timing import time_it

def get_url(url):
    resp = requests.get(url, headers={'user-agent': 'my-app/0.0.1'})
    return resp.text


@time_it
def main(poolsize):
    urls = [
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
    ]
    with ProcessPoolExecutor(poolsize) as executor:
        futures = {executor.submit(get_url, url): url for url in urls}
        for future in as_completed(futures):
            text = future.result()
            url = futures[future]
            print(url, text[0:80])
            print('-' * 100)

if __name__ == '__main__':
    import sys
    main(int(sys.argv[1]))

8个进程:(我拥有的核心数):

func: main args: [(8,), {}] took: 2.316840410232544 sec.

16 个进程:

func: main args: [(16,), {}] took: 1.7964842319488525 sec.

24 个进程:

func: main args: [(24,), {}] took: 2.2560818195343018 sec.