为什么 multiprocessing 运行 东西在同一个进程中?

Why is multiprocessing running things in the same process?

我运行下面的解决方案来自How can I recover the return value of a function passed to multiprocessing.Process?

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

应该输出如下内容:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

但我只得到

[4212, 4212, 4212, 4212, 4212]

如果我使用超过 10 个进程提供 pool.map 1,000,000 的范围,我最多看到两个不同的 pid。

为什么我的 multiprocessing 副本似乎 运行 所有内容都在同一个进程中?

TL;DR: 任务没有以任何方式专门分配,也许你的任务太短了,它们在其他进程开始之前就已经完成了。

multiprocessing 的来源来看,任务似乎只是简单地放在 Queue 中,工作进程从中读取(函数 workerPool._inqueue).没有经过计算的分配,工人们只是竞相努力工作。

那么最有可能的赌注是,由于任务非常短,所以一个进程在其他进程有机会查看甚至开始之前完成所有任务。您可以通过向任务添加两秒 sleep 来轻松检查是否属于这种情况。

我会注意到,在我的机器上,所有任务都非常均匀地分布在进程中(对于#processes > #cores 也是如此)。所以似乎有一些系统依赖性,即使所有进程都应该在工作排队之前 .start()ed。


这是来自 worker 的一些修剪后的源代码,它表明任务只是由每个进程从队列中读取的,因此是伪随机顺序:

def worker(inqueue, outqueue, ...):
    ...
    get = inqueue.get
    ...
    while maxtasks is None or (maxtasks and completed < maxtasks):
        try:
            task = get()
        ...

SimpleQueue 使用 Pipe 在进程之间通信,来自 SimpleQueue 构造函数:

self._reader, self._writer = Pipe(duplex=False)

编辑:可能关于进程启动太慢的部分是错误的,所以我删除了它。在任何工作排队之前,所有进程都被 .start()ed(这可能是 platform 相关的)。我找不到进程是否就绪 .start() returns.