使用 numpy.einsum 删除循环

removing loops with numpy.einsum

我有一些嵌套循环(总共三个),我在其中尝试使用 numpy.einsum 来加速计算,但我正在努力使符号正确。我设法摆脱了一个循环,但我无法弄清楚另外两个。到目前为止,这是我得到的:

import numpy as np
import time

def myfunc(r, q, f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.zeros(nq)
    for ri in range(nr):
        for qi in range(nq):
            y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
    return y

r = np.random.random(size=(1000,1000))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))

start = time.time()
y = myfunc(r, q, f)
end = time.time()

print(end-start)

虽然这比原来快得多,但还是太慢了,大约需要 30 秒。请注意,没有 einsum 调用的原始内容如下(看起来需要约 2.5 小时,等不及确定了):

def myfunc(r, q, f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.zeros(nq)
    for ri in range(nr):
        for rj in range(nr):
            for qi in range(nq):
                y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi))
    return y

有谁知道如何使用 einsum 或任何其他相关工具摆脱这些循环?

您的函数似乎等同于以下内容:

# this is so called broadcasting
s = np.sinc(q * r[...,None]/np.pi)

np.einsum('iq,jq,ijq->q',f,f,s)

这在我的系统上花费了大约 20 秒,大部分时间用于分配 s

让我们用小样本测试一下:

np.random.seed(1)
r = np.random.random(size=(10,10))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
(np.abs(np.einsum('iq,jq,ijq->q',f,f,s) - myfunc(r,q,f)) < 1e-6).all()
# True

由于 np.sinc 不是线性运算符,我不太确定如何进一步减少 运行 时间。

sinc 才是真正的瓶颈, 中也提到了这一点。我们将使用那里的 einsum 表达式以这样的方式结束 -

现在,从 docs 开始,numpy.sinc(x) 是:\sin(\pi x)/(\pi x)。我们将利用它 -

v = q*r[...,None]
p = np.sin(v)/v
mask = (q==0) | (r==0)[...,None]
p[mask] = 1
out = np.einsum('iq,jq,ijq->q',f,f,p)

此外,对于大数据,我们可以利用 multi-cores 和 numexpr,就像这样 -

import numexpr as ne

p = ne.evaluate('sin(q*r3D)/(q*r3D)', {'r3D':r[...,None]})
mask = (q==0) | (r==0)[...,None]
p[mask] = 1
out = np.einsum('iq,jq,ijq->q',f,f,p)

500 个长度数组的计时 -

In [12]: r = np.random.random(size=(500,500))
    ...: q = np.linspace(0,1,501)
    ...: f = np.random.random(size=(r.shape[0],q.shape[0]))

# Original soln with einsum
In [15]: %%timeit
    ...: nr = r.shape[0]
    ...: nq = q.shape[0]
    ...: y = np.zeros(nq)
    ...: for ri in range(nr):
    ...:     for qi in range(nq):
    ...:         y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
9.75 s ± 977 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# @Quang Hoang's soln
In [16]: %%timeit
    ...: s = np.sinc(q * r[...,None]/np.pi)
    ...: np.einsum('iq,jq,ijq->q',f,f,s)
2.75 s ± 7.82 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [17]: %%timeit
    ...: p = ne.evaluate('sin(q3D*r)/(q3D*r)', {'q3D':q[:,None,None]})
    ...: mask = (q==0)[:,None,None] | (r==0)
    ...: p[mask] = 1
    ...: out = np.einsum('iq,jq,qij->q',f,f,p)
1.39 s ± 23.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [18]: %%timeit
    ...: v = q*r[...,None]
    ...: p = np.sin(v)/v
    ...: mask = (q==0) | (r==0)[...,None]
    ...: p[mask] = 1
    ...: out = np.einsum('iq,jq,ijq->q',f,f,p)
2.11 s ± 7.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

对于更大的数据,我们希望 numexpr 一个表现更好,只要我们不 运行 进入 out-of-memory 个案例。

我写这个答案是因为我真的从那里学到了很多关于einsum的知识,如果我分享我在解决这个问题的过程中的想法,将会加强我的理解。

问题是为

设计一个合适的einsum操作

y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))

  1. 查看相关数组的形状。输入:r --> (a,a)q --> (c,)f --> (a,c);输出:y --> (c,)。从这些形状中,对于 q[qi]*r[ri,:] 部分,必须通过类似 r[...,None]*q.

    的方式创建形状为 (a,a,c) 的新数组
  2. 由于np.sinc不会改变数组的形状,而对于固定的一对(ri,qi)f[ri,qi]只是一个数字,我们首先应该想到einsum 操作将重现 f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi),即如何从两个形状数组 (a,c)(a,a,c) 获得形状数组 (a,c)。直观上就是kl,ikl->il.

  3. 对于固定的一对(ri,qi),因为f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi)给出了一个形状为(a,c)的数组,而f的形状为[=26] =],最后的操作就是il,il->l

根据上面的分析,我们有了解决方案:

s = q*r[...,None]/np.pi
res = np.einsum('kl,ikl->il',f,s)
res = np.einsum('il,il->l',f,res)

最简单的方法(也可能是性能最好的)是使用编译器,例如 Numba。由于此功能依赖于 sinc 功能,因此还要确保您已安装 Intel SVML

例子

import numpy as np
import numba as nb

@nb.njit(fastmath=True,parallel=False,error_model="numpy",cache=True)
def myfunc(r, q, f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.zeros(nq)
    for ri in range(nr):
        for rj in range(nr):
            for qi in range(nq):
                y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi)
    return y

@nb.njit(fastmath=True,parallel=True,error_model="numpy",cache=True)
def myfunc_opt(r, q, f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.empty(nq)

    #for contiguous memory access in the loop
    f_T=np.ascontiguousarray(f.T)
    for qi in nb.prange(nq):
        acc=0
        for ri in range(nr):
            for rj in range(nr):
                acc += f_T[qi,ri]*f_T[qi,rj]*np.sinc(q[qi]*r[ri,rj]/np.pi)
        y[qi]=acc
    return y

@nb.njit(fastmath=True,parallel=True,error_model="numpy",cache=True)
def myfunc_opt_2(r, q, f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.empty(nq)


    f_T=np.ascontiguousarray(f.T)
    for qi in nb.prange(nq):
        acc=0
        for ri in range(nr):
            for rj in range(nr):
                #Test carefully!
                if q[qi]*r[ri,rj]!=0.:
                    acc += f_T[qi,ri]*f_T[qi,rj]*np.sin(q[qi]*r[ri,rj])/(q[qi]*r[ri,rj])
                else:
                    acc += f_T[qi,ri]*f_T[qi,rj]
        y[qi]=acc
    return y

def numpy_func(r, q, f):
    s = np.sinc(q * r[...,None]/np.pi)
    return np.einsum('iq,jq,ijq->q',f,f,s)

小数组计时

r = np.random.random(size=(500,500))
q = np.linspace(0,1,501)
f = np.random.random(size=(r.shape[0],q.shape[0]))
%timeit y = myfunc(r, q, f)
#765 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt(r, q, f)
#158 ms ± 2.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt_2(r, q, f)
#51.5 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit y = numpy_func(r, q, f)
#3.81 s ± 61.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
print(np.allclose(numpy_func(r, q, f),myfunc(r, q, f)))
#True
print(np.allclose(numpy_func(r, q, f),myfunc_opt(r, q, f)))
#True
print(np.allclose(numpy_func(r, q, f),myfunc_opt_2(r, q, f)))

更大数组的计时

r = np.random.random(size=(1000,1000))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
%timeit y = myfunc(r, q, f)
#6.1 s ± 4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt(r, q, f)
#1.26 s ± 18.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt_2(r, q, f)
#397 ms ± 2.69 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)