python 最后一个进程的多处理映射错误处理

python multiprocessing map mishandling of last processes

在使用 Python 的 multiprocessing.Pool 时,map 有一个奇怪的行为。在下面的示例中,一个由 4 个处理器组成的池将处理 28 个任务。这应该需要七次通过,每次需要 4 秒。

但是,它需要 8 次传递。在前六遍中,所有处理器都在使用。在第 7 遍中,仅完成了两个任务(两个空闲处理器)。剩余的 2 个任务在第 8 次完成(同样是两个空闲处理器)。这种行为出现在看似随机的 CPU 数量和任务数量的组合中,不必要地浪费时间。

此示例已在 Intel Xeon Haswell(20 核)和 Intel i7(4 核)上重现。

关于如何强制 Pool 在所有通道中使用所有可用处理器的任何想法?

import time
import multiprocessing
from multiprocessing import Pool
import datetime

def f(values):
    now = str(datetime.datetime.now())
    proc_id = str(multiprocessing.current_process())
    print(proc_id+' '+now)
    a=values**2
    time.sleep(4)
    return a 

if __name__ == '__main__':
    p = Pool(4) #number of processes
    processed_values= p.map( f, range(28))
    p.close()
    p.join()
    print processed_values

运行 的输出如下

<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:49.604065
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:49.604189
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:49.604252
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:49.604866
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:53.608475
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:53.608878
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:53.608931
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:53.609503
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:57.612831
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:57.613135
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:57.613555
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:57.614065
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:01.616974
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:01.617273
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:01.617699
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:01.618190
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:05.621284
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:05.621489
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:05.622130
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:05.622404
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:09.625522
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:09.625631
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:09.626555
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:09.626566
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:13.629761
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:13.629846
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:17.634003
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:17.634317
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729]

这与以下问题有关,该问题没有明确或正确的答案。 Python: Multiprocessing Map takes longer to complete last few processes

这是由于 Pool.map 将您传递的可迭代对象分块并将其发送给 Pool 中的每个工作人员的方式造成的。如果您强制 chunksize 为 1,您将看到您期望的行为:

import time
import multiprocessing
from multiprocessing import Pool
import datetime

def f(values):
    now = str(datetime.datetime.now())
    proc_id = str(multiprocessing.current_process())
    print(proc_id+' '+now)
    a=values**2
    time.sleep(4)
    return a 

if __name__ == '__main__':
    p = Pool(4) #number of processes
    processed_values= p.map( f, range(28), chunksize=1)
    p.close()
    p.join()
    print processed_values

输出:

<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:06.548733
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:06.548803
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:06.549013
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:06.549052
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:10.549509
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:10.551091
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:10.553057
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:10.553263
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:14.553765
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:14.553821
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:14.554953
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:14.557262
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:18.556535
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:18.556611
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:18.558019
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:18.561597
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:22.560039
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:22.560097
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:22.562236
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:22.565912
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:26.564383
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:26.564430
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:26.564589
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:26.570232
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:30.568634
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:30.568647
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:30.568752
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:30.574456
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729]

map 在您未提供时选择块大小的算法如下所示:

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

对于大小为 28 的可迭代对象,结果为 2。这意味着每个工作进程一次从您的可迭代对象中获取两项,而不是一项。所以,当队列中只剩下四个项目时,第一个空闲的工人得到两个,第二个空闲的工人得到两个,不再给其他两个工人留下更多。

分块的首要原因是它通过减少 IPC 开销大大提高了处理非常大的可迭代对象时的性能。对于较小的可迭代对象,它往往不会产生太大影响,甚至会损害性能,就像在这种情况下一样。