python 中二维数组运算的高效并行化

Efficient parallelization of operations on two dimensional array operations in python

我正在尝试使用 python 中的 joblib 库对二维数组进行并行化操作。这是我的代码

from joblib import Parallel, delayed
import multiprocessing
import numpy as np

# The code below just aggregates the base_array to form a new two dimensional array
base_array = np.ones((2**12, 2**12), dtype=np.uint8)
def compute_average(i, j):
    return np.uint8(np.mean(base_array[i*4: (i+1)*4, j*4: (j+1)*4]))

num_cores = multiprocessing.cpu_count()
new_array = np.array(Parallel(n_jobs=num_cores)(delayed(compute_average)(i, j) 
                                        for i in xrange(0,1024) for j in xrange(0,1024)), dtype=np.uint8)

上面的代码比下面的基本嵌套 for 循环花费更多的时间。

new_array_nested = np.ones((2**10, 2**10), dtype=np.uint8)
for i in xrange(0,1024):
    for j in xrange(0,1024):
         new_array_nested[i,j] = compute_average(i,j)

为什么并行操作需要更多时间?如何提高上面代码的效率?

Wow! Absolutely loved your code. It worked like a charm improving the total efficiency by 400x. I'll try to read more about numba and jit compilers, but can you write briefly of why it is so efficient. Thanks once again for all the help! – Ram Jan 3 '18 at 20:30

我们可以很容易地到达77 [ms]下的某个地方,但是需要掌握一些步骤才能到达那里,所以让我们开始吧:


问:为什么并行操作需要更多时间?

因为 joblib 的建议步骤创建了那么多的完整进程副本 - 以逃避 GIL 步进的 pure-[SERIAL] 跳舞(一个接一个)但是(!)这包括所有变量和整个 python 解释器及其内部状态,在它开始执行 "usefull" 的第一步之前 "payload"-计算策略,
所以
所有这些实例化开销的总和很容易变得比成反比的 1 / N 因子
的开销不可知期望大 你在哪里设置 N ~ num_cores.

For details, read the mathematical formulation in the tail part of Amdahl's Law re-formulation here.


Q:上面的代码可以帮助提高效率吗?

尽可能节省所有管理费用:
- 在可能的情况下:
- 在进程生成端,尝试使用n_jobs = ( num_cores - 1 )让更多space用于"main"前进的过程和性能提升的基准
- 在进程终止端,避免从return值中收集并构建一个新的(可能很大)对象,而是预分配一个刚刚-足够大的本地进程数据结构和 return 一些高效的、序列化的,以便轻松和非阻塞地合并每个部分 returned 结果的对齐方式。

这两个 "hidden" 成本都是您的主要设计敌人,因为它们会线性添加到计算路径的纯 [SERIAL] 部分整个问题的解决方案 ( ref.: the effects of both of these in the overhead-strict Amdahl's Law formula )


实验与结果:

>>> from zmq import Stopwatch; aClk = Stopwatch()
>>> base_array = np.ones( (2**12, 2**12), dtype = np.uint8 )
>>> base_array.flags
  C_CONTIGUOUS : True
  F_CONTIGUOUS : False
  OWNDATA      : True
  WRITEABLE    : True
  ALIGNED      : True
  UPDATEIFCOPY : False
>>> def compute_average_per_TILE(               TILE_i,   TILE_j ): // NAIVE MODE
...     return np.uint8( np.mean( base_array[ 4*TILE_i:4*(TILE_i+1),
...                                           4*TILE_j:4*(TILE_j+1)
...                                           ]
...                               )
...                      )
... 
>>> aClk.start(); _ = compute_average_per_TILE( 12,13 ); aClk.stop()
25110
  102
  109
   93

每次射击大约需要 93 [us]。期望大约 1024*1024*93 ~ 97,517,568 [us] 涵盖整个 base_array 的均值处理。

在实验上,这里可以很好地看到未很好处理开销的影响,朴素的嵌套实验进行了:

>>> aClk.start(); _ = [ compute_average_per_TILE( i, j )
                                              for i    in xrange(1024)
                                              for    j in xrange(1024)
                        ]; aClk.stop()
26310594
^^...... 
26310594 / 1024. / 1024. == 25.09 [us/cell]

大约减少了 3.7 倍(由于未发生 "tail"-部分(分配单个 returned 值)开销 2**20次,但只有一次,在终端作业中。

然而,更多惊喜即将到来。


什么是合适的工具?

从来没有放之四海而皆准的法则,没有放之四海而皆准的法则。

给定
每次调用处理的不仅仅是一个 4x4 矩阵图块(根据提议的 joblib[= 实际上 少于 25 [us] 114=]-精心策划的 2**20 次调用,分布在 ~ .cpu_count() 由原始提案

完全实例化的进程
...( joblib.Parallel( n_jobs = num_cores )( 
     joblib.delayed( compute_average )( i, j ) 
                                    for i    in xrange( 1024 )
                                    for    j in xrange( 1024 )
     )

确实有一个space可以提高性能

对于这些小规模矩阵(并非所有问题在这个意义上都如此令人满意),可以期望从更智能的内存访问模式和减少 python GIL 引发的弱点中获得最佳结果。

由于每次调用跨度只是一个 4x4 微型计算,更好的方法是利用智能矢量化(所有数据都适合缓存,因此缓存内计算是追求最佳性能的假期之旅)

最好的(仍然是非常简单的向量化代码)
能够从 ~ 25 [us/cell] 小于 ~ 74 [ns/cell](仍然有一个 space 以便更好地对齐处理,因为它需要 ~ 4.6 [ns] / a base_array 单元格处理 ),所以如果缓存内优化的矢量化代码能够正确制作,那么期待另一个级别的加速。

77 [ms]?!值得这样做,不是吗?

不是97秒,
不是 25 秒,
但少于77 [ms]只需敲几下键盘,如果更好地优化呼号,可能会挤掉更多:

>>> import numba
>>> @numba.jit( nogil = True, nopython = True )
... def jit_avg2( base_IN, ret_OUT ):  // all pre-allocated memory for these data-structures
...     for i in np.arange( 1024 ):    // vectorised-code ready numpy iterator
...         for j in np.arange( 1024 ):// vectorised-code ready numpy iterator 
...             ret_OUT[i,j] = np.uint8( np.mean( base_IN[4*i:4*(i+1),
...                                                       4*j:4*(j+1)
...                                                       ]
...                                               )
...                                      )
...     return                         // avoid terminal assignment costs
... 

>>> aClk.start(); _ = jit_avg2( base_array, mean_array ); aClk.stop()
1586182 (even with all the jit-compilation circus, it was FASTER than GIL-stepped nested fors ...)
  76935
  77337