与串行映射相比,为什么多处理池映射没有加速?

Why doesn't multiprocessing pool map speed up compared to serial map?

我有这个非常简单的 python 代码,我想通过并行化它来加快速度。然而,无论我做什么,multiprocessing.Pool.map 都没有获得超过标准地图的任何东西。

我读过其他线程,其中人们将它与非常小的函数一起使用,这些函数不能很好地并行化并导致过多的开销,但我认为这里不应该是这种情况。

我是不是做错了什么?

示例如下

#!/usr/bin/python

import numpy, time

def AddNoise(sample):
    #time.sleep(0.001)
    return sample + numpy.random.randint(0,9,sample.shape)
    #return sample + numpy.ones(sample.shape)

n=100
m=10000
start = time.time()
A = list([ numpy.random.randint(0,9,(n,n)) for i in range(m) ])
print("creating %d numpy arrays of %d x %d took %.2f seconds"%(m,n,n,time.time()-start))

for i in range(3):
    start = time.time()
    A = list(map(AddNoise, A))
    print("adding numpy arrays took %.2f seconds"%(time.time()-start))

for i in range(3):
    import multiprocessing
    start = time.time()
    with multiprocessing.Pool(processes=2) as pool:
        A = list(pool.map(AddNoise, A, chunksize=100))
    print("adding numpy arrays with multiprocessing Pool took %.2f seconds"%(time.time()-start))

for i in range(3):
    import concurrent.futures
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        A = list(executor.map(AddNoise, A))
    print("adding numpy arrays with concurrent.futures.ProcessPoolExecutor took %.2f seconds"%(time.time()-start))

这会在我的 4-core/8-thread 笔记本电脑上产生以下输出,否则它是空闲的

$ python test-pool.py 
creating 10000 numpy arrays of 100 x 100 took 1.54 seconds
adding numpy arrays took 1.65 seconds
adding numpy arrays took 1.51 seconds
adding numpy arrays took 1.51 seconds
adding numpy arrays with multiprocessing Pool took 1.99 seconds
adding numpy arrays with multiprocessing Pool took 1.98 seconds
adding numpy arrays with multiprocessing Pool took 1.94 seconds
adding numpy arrays with concurrent.futures.ProcessPoolExecutor took 3.32 seconds
adding numpy arrays with concurrent.futures.ProcessPoolExecutor took 3.17 seconds
adding numpy arrays with concurrent.futures.ProcessPoolExecutor took 3.25 seconds

问题出在结果传输上。考虑到使用多处理,您在子进程中创建的数组需要传输回主进程。这是一种开销。

我检查了这个以这种方式修改 AddNoise 函数,它保留了计算时间,但丢弃了传输时间:

def AddNoise(sample):
   sample + numpy.random.randint(0,9,sample.shape)
   return None