最大限度地减少由于大量 Numpy 点调用而产生的开销

Minimizing overhead due to the large number of Numpy dot calls

我的问题如下,我有一个迭代算法,每次迭代都需要执行几个矩阵-矩阵乘法 dot(A_i, B_i),因为 i = 1 ... k。由于这些乘法是使用 Numpy 的点执行的,我知道它们正在调用 BLAS-3 实现,这非常快。问题是调用次数巨大,结果成为我程序的瓶颈。我想通过减少产品但使用更大的矩阵来最大程度地减少所有这些调用的开销。

为简单起见,假设所有矩阵都是n x n(通常n不大,范围在1到1000之间)。解决我的问题的一种方法是考虑块对角矩阵 diag(A_i) 并执行下面的产品。

这只是对函数 dot 的一次调用,但现在程序浪费了很多时间来执行与零的乘法。这个想法似乎行不通,但它给出了结果 [A_1 B_1, ..., A_k B_k],即所有产品堆叠在一个大矩阵中。

我的问题是,有没有办法计算[A_1 B_1, ..., A_k B_k] 用一个函数调用?或者更重要的是,我怎样才能比制作一个 Numpy 点循环更快地计算这些乘积?

您可以堆叠数组以具有形状 (k, n, n),然后调用 numpy.matmul 或使用 @ 运算符。

例如,

In [18]: A0 = np.array([[1, 2], [3, 4]])                                                                 

In [19]: A1 = np.array([[1, 2], [-3, 5]])                                                                

In [20]: A2 = np.array([[4, 0], [1, 1]])                                                                 

In [21]: B0 = np.array([[1, 4], [-3, 4]])                                                                

In [22]: B1 = np.array([[2, 1], [1, 1]])                                                                 

In [23]: B2 = np.array([[-2, 9], [0, 1]])                                                                

In [24]: np.matmul([A0, A1, A2], [B0, B1, B2])                                                           
Out[24]: 
array([[[-5, 12],
        [-9, 28]],

       [[ 4,  3],
        [-1,  2]],

       [[-8, 36],
        [-2, 10]]])

或者,使用 @:

In [32]: A = np.array([A0, A1, A2])                                                                      

In [33]: A                                                                                               
Out[33]: 
array([[[ 1,  2],
        [ 3,  4]],

       [[ 1,  2],
        [-3,  5]],

       [[ 4,  0],
        [ 1,  1]]])

In [34]: B = np.array([B0, B1, B2])                                                                      

In [35]: A @ B                                                                                           
Out[35]: 
array([[[-5, 12],
        [-9, 28]],

       [[ 4,  3],
        [-1,  2]],

       [[-8, 36],
        [-2, 10]]])

这取决于矩阵的大小

编辑

对于较大的 nxn 矩阵(大约大小 20),编译代码的 BLAS 调用速度更快,对于较小的矩阵,自定义 Numba 或 Cython 内核通常更快。

以下方法为给定的输入形状生成自定义点函数。通过这种方法,还可以受益于编译器相关的优化,例如循环展开,这对于小矩阵尤为重要。

需要注意的是,生成和编译一个内核需要大约。 1s,因此请确保仅在确实需要时才调用生成器。

生成器函数

def gen_dot_nm(x,y,z):
    #small kernels
    @nb.njit(fastmath=True,parallel=True)
    def dot_numba(A,B):
        """
        calculate dot product for (x,y)x(y,z)
        """
        assert A.shape[0]==B.shape[0]
        assert A.shape[2]==B.shape[1]

        assert A.shape[1]==x
        assert B.shape[1]==y
        assert B.shape[2]==z

        res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
        for ii in nb.prange(A.shape[0]):
            for i in range(x):
                for j in range(z):
                    acc=0.
                    for k in range(y):
                        acc+=A[ii,i,k]*B[ii,k,j]
                    res[ii,i,j]=acc
        return res

    #large kernels
    @nb.njit(fastmath=True,parallel=True)
    def dot_BLAS(A,B):
        assert A.shape[0]==B.shape[0]
        assert A.shape[2]==B.shape[1]

        res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
        for ii in nb.prange(A.shape[0]):
            res[ii]=np.dot(A[ii],B[ii])
        return res

    #At square matices above size 20
    #calling BLAS is faster
    if x>=20 or y>=20 or z>=20:
        return dot_BLAS
    else:
        return dot_numba

用法示例

A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)

dot22=gen_dot_nm(2,2,2)
X=dot22(A,B)
%timeit X3=dot22(A,B)
#5.94 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each) 

旧答案

另一种方法是使用一些特殊的 BLAS 实现,但需要做更多的工作,它会及时为非常小的矩阵创建 custom kernels,而不是从 C 调用此内核。

例子

import numpy as np
import numba as nb

#Don't use this for larger submatrices
@nb.njit(fastmath=True,parallel=True)
def dot(A,B):
    assert A.shape[0]==B.shape[0]
    assert A.shape[2]==B.shape[1]

    res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
    for ii in nb.prange(A.shape[0]):
        for i in range(A.shape[1]):
            for j in range(B.shape[2]):
                acc=0.
                for k in range(B.shape[1]):
                    acc+=A[ii,i,k]*B[ii,k,j]
                res[ii,i,j]=acc
    return res

@nb.njit(fastmath=True,parallel=True)
def dot_22(A,B):
    assert A.shape[0]==B.shape[0]
    assert A.shape[1]==2
    assert A.shape[2]==2
    assert B.shape[1]==2
    assert B.shape[2]==2

    res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
    for ii in nb.prange(A.shape[0]):
        res[ii,0,0]=A[ii,0,0]*B[ii,0,0]+A[ii,0,1]*B[ii,1,0]
        res[ii,0,1]=A[ii,0,0]*B[ii,0,1]+A[ii,0,1]*B[ii,1,1]
        res[ii,1,0]=A[ii,1,0]*B[ii,0,0]+A[ii,1,1]*B[ii,1,0]
        res[ii,1,1]=A[ii,1,0]*B[ii,0,1]+A[ii,1,1]*B[ii,1,1]
    return res

计时

A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)

X=A@B
X2=np.einsum("xik,xkj->xij",A,B)
X3=dot_22(A,B) #avoid measurig compilation overhead
X4=dot(A,B)    #avoid measurig compilation overhead

%timeit X=A@B
#262 µs ± 2.55 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit np.einsum("xik,xkj->xij",A,B,optimize=True)
#264 µs ± 3.22 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit X3=dot_22(A,B)
#5.68 µs ± 27.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%timeit X4=dot(A,B)
#9.79 µs ± 61.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

如果你不想浪费时间乘以零,那么你真正想要的是稀疏矩阵。使用来自@WarrenWeckesser 的 AB 矩阵:

from scipy import sparse
sparse.block_diag((A0, A1, A2), format = "csr") @ np.concatenate((B0, B1, B2), axis = 0)
Out[]: 
array([[-5, 12],
       [-9, 28],
       [ 4,  3],
       [-1,  2],
       [-8, 36],
       [-2, 10]], dtype=int32)

这可能是大型矩阵的加速。对于较小的 @max9111 可能有正确的想法使用 numba.