numpy 数组上的简单多处理

simple mutiprocessing on numpy array

我想使用 multiprocessing 模块加快 u ** 2 的计算速度,其中 u 是一个 numpy 数组。

这是我的尝试(文件名multi.py):

# to compile on Windows/Ipython  : import multi  then  run -m multi

from multiprocessing import Pool
import numpy as np

if __name__ == '__main__':
 u=np.arange(6e7)
 def test(N):
    pool = Pool(N)
    v=len(u)//N
    tasks = [ u[k*v:(k+1)*v] for k in range(N)]  
    res = pool.map_async(np.square,tasks).get()
    return res

这是基准测试:

In [25]: %time  r1=test(1)
Wall time: 13.2 s

In [26]: %time  r2=test(2)
Wall time: 7.75 s

In [27]: %time  r4=test(4)
Wall time: 8.29 s

In [31]: %time r=u**2
Wall time: 512 ms

我的 PC 上有 2 个物理内核,所以测试 (2) 运行 比测试 (1) 快是令人鼓舞的。

但目前,numpy 更快。多处理增加了很大的过载。

所以我的问题是:如何(或是否可能)使用 multiprocessing 加速 u ** 2

编辑

我意识到所有的过程工作都是在他自己的内存中完成的space,因此必然会出现大量的复制(See here for example)。所以没有希望以这种方式加速简单的计算。

CPython 中的多处理本质上是昂贵的,因为 Global Interpreter Lock 会阻止多个本机线程同时执行相同的 Python 字节码。 multiprocessing 通过为每个工作进程生成一个单独的 Python 解释器,并使用 pickling 向工作进程发送参数和 return 变量来解决此限制。不幸的是,这需要很多不可避免的开销。

如果您绝对必须使用multiprocessing,建议对每个进程做尽可能多的工作,以尽量减少产卵和花费的相对时间杀死进程。例如,如果您正在并行处理较大数组的块,则使这些块尽可能大,并一次完成尽可能多的处理步骤,而不是多次遍历数组。

不过,一般来说,使用不受 GIL 限制的低级语言进行多线程处理会更好。对于简单的数值表达式,例如您的示例,numexpr 是实现显着性能提升的一种非常简单的方法(~4x,在具有 4 个内核和超线程的 i7 CPU 上)。除了在 C++ 中实现并行处理,一个更显着的好处是它避免了为中间结果分配内存,从而更有效地使用缓存。

In [1]: import numexpr as ne

In [2]: u = np.arange(6e7)

In [3]: %%timeit u = np.arange(6e7)
   .....: u**2
   .....: 
1 loop, best of 3: 528 ms per loop

In [4]: %%timeit u = np.arange(6e7)
ne.evaluate("u**2")
   .....: 
10 loops, best of 3: 127 ms per loop

适用于更复杂任务的其他选项包括 Cython and numba

最后,我还应该提到,除了 CPython 之外,还有其他 Python 实现缺少 GIL,例如 PyPy、Jython 和 IronPython。然而,这些都有其自身的局限性。据我所知,其中 none 为 numpy、scipy 或 matplotlib 提供了适当的支持。

我自己回答:

来自 scipy-cookbook,恕我直言,一个被低估的功能:

while numpy is doing an array operation, python also releases the GIL.

所以多线程对于 numpy 操作来说不是问题。

from threading import Thread
import numpy as np

u=np.arange(6*10**7)

def multi(N):
    n=u.size//N 
    threads = [Thread(target=np.ndarray.__ipow__,
               args=(u[k*n:(k+1)*n],2)) for k in range(N)]  
    for t in  threads: t.start()
    for t in  threads: t.join()

在 2 核处理器上获得近 2 倍的增益:

In [7]: %timeit test(1)
10 loops, best of 3: 172 ms per loop

In [8]: %timeit test(4)
10 loops, best of 3: 92.7 ms per loop