使用 numpy.einsum 删除循环
removing loops with numpy.einsum
我有一些嵌套循环(总共三个),我在其中尝试使用 numpy.einsum 来加速计算,但我正在努力使符号正确。我设法摆脱了一个循环,但我无法弄清楚另外两个。到目前为止,这是我得到的:
import numpy as np
import time
def myfunc(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for qi in range(nq):
y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
return y
r = np.random.random(size=(1000,1000))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
start = time.time()
y = myfunc(r, q, f)
end = time.time()
print(end-start)
虽然这比原来快得多,但还是太慢了,大约需要 30 秒。请注意,没有 einsum 调用的原始内容如下(看起来需要约 2.5 小时,等不及确定了):
def myfunc(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for rj in range(nr):
for qi in range(nq):
y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi))
return y
有谁知道如何使用 einsum 或任何其他相关工具摆脱这些循环?
您的函数似乎等同于以下内容:
# this is so called broadcasting
s = np.sinc(q * r[...,None]/np.pi)
np.einsum('iq,jq,ijq->q',f,f,s)
这在我的系统上花费了大约 20 秒,大部分时间用于分配 s
。
让我们用小样本测试一下:
np.random.seed(1)
r = np.random.random(size=(10,10))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
(np.abs(np.einsum('iq,jq,ijq->q',f,f,s) - myfunc(r,q,f)) < 1e-6).all()
# True
由于 np.sinc
不是线性运算符,我不太确定如何进一步减少 运行 时间。
sinc
才是真正的瓶颈, 中也提到了这一点。我们将使用那里的 einsum
表达式以这样的方式结束 -
现在,从 docs
开始,numpy.sinc(x)
是:\sin(\pi x)/(\pi x)
。我们将利用它 -
v = q*r[...,None]
p = np.sin(v)/v
mask = (q==0) | (r==0)[...,None]
p[mask] = 1
out = np.einsum('iq,jq,ijq->q',f,f,p)
此外,对于大数据,我们可以利用 multi-cores 和 numexpr
,就像这样 -
import numexpr as ne
p = ne.evaluate('sin(q*r3D)/(q*r3D)', {'r3D':r[...,None]})
mask = (q==0) | (r==0)[...,None]
p[mask] = 1
out = np.einsum('iq,jq,ijq->q',f,f,p)
500 个长度数组的计时 -
In [12]: r = np.random.random(size=(500,500))
...: q = np.linspace(0,1,501)
...: f = np.random.random(size=(r.shape[0],q.shape[0]))
# Original soln with einsum
In [15]: %%timeit
...: nr = r.shape[0]
...: nq = q.shape[0]
...: y = np.zeros(nq)
...: for ri in range(nr):
...: for qi in range(nq):
...: y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
9.75 s ± 977 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# @Quang Hoang's soln
In [16]: %%timeit
...: s = np.sinc(q * r[...,None]/np.pi)
...: np.einsum('iq,jq,ijq->q',f,f,s)
2.75 s ± 7.82 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [17]: %%timeit
...: p = ne.evaluate('sin(q3D*r)/(q3D*r)', {'q3D':q[:,None,None]})
...: mask = (q==0)[:,None,None] | (r==0)
...: p[mask] = 1
...: out = np.einsum('iq,jq,qij->q',f,f,p)
1.39 s ± 23.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [18]: %%timeit
...: v = q*r[...,None]
...: p = np.sin(v)/v
...: mask = (q==0) | (r==0)[...,None]
...: p[mask] = 1
...: out = np.einsum('iq,jq,ijq->q',f,f,p)
2.11 s ± 7.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
对于更大的数据,我们希望 numexpr
一个表现更好,只要我们不 运行 进入 out-of-memory 个案例。
我写这个答案是因为我真的从那里学到了很多关于einsum
的知识,如果我分享我在解决这个问题的过程中的想法,将会加强我的理解。
问题是为
设计一个合适的einsum
操作
y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
查看相关数组的形状。输入:r --> (a,a)
、q --> (c,)
和 f --> (a,c)
;输出:y --> (c,)
。从这些形状中,对于 q[qi]*r[ri,:]
部分,必须通过类似 r[...,None]*q
.
的方式创建形状为 (a,a,c)
的新数组
由于np.sinc
不会改变数组的形状,而对于固定的一对(ri,qi)
,f[ri,qi]
只是一个数字,我们首先应该想到einsum
操作将重现 f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi)
,即如何从两个形状数组 (a,c)
和 (a,a,c)
获得形状数组 (a,c)
。直观上就是kl,ikl->il
.
对于固定的一对(ri,qi)
,因为f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi)
给出了一个形状为(a,c)
的数组,而f
的形状为[=26] =],最后的操作就是il,il->l
根据上面的分析,我们有了解决方案:
s = q*r[...,None]/np.pi
res = np.einsum('kl,ikl->il',f,s)
res = np.einsum('il,il->l',f,res)
最简单的方法(也可能是性能最好的)是使用编译器,例如 Numba。由于此功能依赖于 sinc
功能,因此还要确保您已安装 Intel SVML。
例子
import numpy as np
import numba as nb
@nb.njit(fastmath=True,parallel=False,error_model="numpy",cache=True)
def myfunc(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for rj in range(nr):
for qi in range(nq):
y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi)
return y
@nb.njit(fastmath=True,parallel=True,error_model="numpy",cache=True)
def myfunc_opt(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.empty(nq)
#for contiguous memory access in the loop
f_T=np.ascontiguousarray(f.T)
for qi in nb.prange(nq):
acc=0
for ri in range(nr):
for rj in range(nr):
acc += f_T[qi,ri]*f_T[qi,rj]*np.sinc(q[qi]*r[ri,rj]/np.pi)
y[qi]=acc
return y
@nb.njit(fastmath=True,parallel=True,error_model="numpy",cache=True)
def myfunc_opt_2(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.empty(nq)
f_T=np.ascontiguousarray(f.T)
for qi in nb.prange(nq):
acc=0
for ri in range(nr):
for rj in range(nr):
#Test carefully!
if q[qi]*r[ri,rj]!=0.:
acc += f_T[qi,ri]*f_T[qi,rj]*np.sin(q[qi]*r[ri,rj])/(q[qi]*r[ri,rj])
else:
acc += f_T[qi,ri]*f_T[qi,rj]
y[qi]=acc
return y
def numpy_func(r, q, f):
s = np.sinc(q * r[...,None]/np.pi)
return np.einsum('iq,jq,ijq->q',f,f,s)
小数组计时
r = np.random.random(size=(500,500))
q = np.linspace(0,1,501)
f = np.random.random(size=(r.shape[0],q.shape[0]))
%timeit y = myfunc(r, q, f)
#765 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt(r, q, f)
#158 ms ± 2.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt_2(r, q, f)
#51.5 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit y = numpy_func(r, q, f)
#3.81 s ± 61.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
print(np.allclose(numpy_func(r, q, f),myfunc(r, q, f)))
#True
print(np.allclose(numpy_func(r, q, f),myfunc_opt(r, q, f)))
#True
print(np.allclose(numpy_func(r, q, f),myfunc_opt_2(r, q, f)))
更大数组的计时
r = np.random.random(size=(1000,1000))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
%timeit y = myfunc(r, q, f)
#6.1 s ± 4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt(r, q, f)
#1.26 s ± 18.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt_2(r, q, f)
#397 ms ± 2.69 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
我有一些嵌套循环(总共三个),我在其中尝试使用 numpy.einsum 来加速计算,但我正在努力使符号正确。我设法摆脱了一个循环,但我无法弄清楚另外两个。到目前为止,这是我得到的:
import numpy as np
import time
def myfunc(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for qi in range(nq):
y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
return y
r = np.random.random(size=(1000,1000))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
start = time.time()
y = myfunc(r, q, f)
end = time.time()
print(end-start)
虽然这比原来快得多,但还是太慢了,大约需要 30 秒。请注意,没有 einsum 调用的原始内容如下(看起来需要约 2.5 小时,等不及确定了):
def myfunc(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for rj in range(nr):
for qi in range(nq):
y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi))
return y
有谁知道如何使用 einsum 或任何其他相关工具摆脱这些循环?
您的函数似乎等同于以下内容:
# this is so called broadcasting
s = np.sinc(q * r[...,None]/np.pi)
np.einsum('iq,jq,ijq->q',f,f,s)
这在我的系统上花费了大约 20 秒,大部分时间用于分配 s
。
让我们用小样本测试一下:
np.random.seed(1)
r = np.random.random(size=(10,10))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
(np.abs(np.einsum('iq,jq,ijq->q',f,f,s) - myfunc(r,q,f)) < 1e-6).all()
# True
由于 np.sinc
不是线性运算符,我不太确定如何进一步减少 运行 时间。
sinc
才是真正的瓶颈,einsum
表达式以这样的方式结束 -
现在,从 docs
开始,numpy.sinc(x)
是:\sin(\pi x)/(\pi x)
。我们将利用它 -
v = q*r[...,None]
p = np.sin(v)/v
mask = (q==0) | (r==0)[...,None]
p[mask] = 1
out = np.einsum('iq,jq,ijq->q',f,f,p)
此外,对于大数据,我们可以利用 multi-cores 和 numexpr
,就像这样 -
import numexpr as ne
p = ne.evaluate('sin(q*r3D)/(q*r3D)', {'r3D':r[...,None]})
mask = (q==0) | (r==0)[...,None]
p[mask] = 1
out = np.einsum('iq,jq,ijq->q',f,f,p)
500 个长度数组的计时 -
In [12]: r = np.random.random(size=(500,500))
...: q = np.linspace(0,1,501)
...: f = np.random.random(size=(r.shape[0],q.shape[0]))
# Original soln with einsum
In [15]: %%timeit
...: nr = r.shape[0]
...: nq = q.shape[0]
...: y = np.zeros(nq)
...: for ri in range(nr):
...: for qi in range(nq):
...: y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
9.75 s ± 977 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# @Quang Hoang's soln
In [16]: %%timeit
...: s = np.sinc(q * r[...,None]/np.pi)
...: np.einsum('iq,jq,ijq->q',f,f,s)
2.75 s ± 7.82 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [17]: %%timeit
...: p = ne.evaluate('sin(q3D*r)/(q3D*r)', {'q3D':q[:,None,None]})
...: mask = (q==0)[:,None,None] | (r==0)
...: p[mask] = 1
...: out = np.einsum('iq,jq,qij->q',f,f,p)
1.39 s ± 23.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [18]: %%timeit
...: v = q*r[...,None]
...: p = np.sin(v)/v
...: mask = (q==0) | (r==0)[...,None]
...: p[mask] = 1
...: out = np.einsum('iq,jq,ijq->q',f,f,p)
2.11 s ± 7.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
对于更大的数据,我们希望 numexpr
一个表现更好,只要我们不 运行 进入 out-of-memory 个案例。
我写这个答案是因为我真的从einsum
的知识,如果我分享我在解决这个问题的过程中的想法,将会加强我的理解。
问题是为
设计一个合适的einsum
操作
y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
查看相关数组的形状。输入:
的方式创建形状为r --> (a,a)
、q --> (c,)
和f --> (a,c)
;输出:y --> (c,)
。从这些形状中,对于q[qi]*r[ri,:]
部分,必须通过类似r[...,None]*q
.(a,a,c)
的新数组由于
np.sinc
不会改变数组的形状,而对于固定的一对(ri,qi)
,f[ri,qi]
只是一个数字,我们首先应该想到einsum
操作将重现f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi)
,即如何从两个形状数组(a,c)
和(a,a,c)
获得形状数组(a,c)
。直观上就是kl,ikl->il
.对于固定的一对
(ri,qi)
,因为f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi)
给出了一个形状为(a,c)
的数组,而f
的形状为[=26] =],最后的操作就是il,il->l
根据上面的分析,我们有了解决方案:
s = q*r[...,None]/np.pi
res = np.einsum('kl,ikl->il',f,s)
res = np.einsum('il,il->l',f,res)
最简单的方法(也可能是性能最好的)是使用编译器,例如 Numba。由于此功能依赖于 sinc
功能,因此还要确保您已安装 Intel SVML。
例子
import numpy as np
import numba as nb
@nb.njit(fastmath=True,parallel=False,error_model="numpy",cache=True)
def myfunc(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for rj in range(nr):
for qi in range(nq):
y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi)
return y
@nb.njit(fastmath=True,parallel=True,error_model="numpy",cache=True)
def myfunc_opt(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.empty(nq)
#for contiguous memory access in the loop
f_T=np.ascontiguousarray(f.T)
for qi in nb.prange(nq):
acc=0
for ri in range(nr):
for rj in range(nr):
acc += f_T[qi,ri]*f_T[qi,rj]*np.sinc(q[qi]*r[ri,rj]/np.pi)
y[qi]=acc
return y
@nb.njit(fastmath=True,parallel=True,error_model="numpy",cache=True)
def myfunc_opt_2(r, q, f):
nr = r.shape[0]
nq = q.shape[0]
y = np.empty(nq)
f_T=np.ascontiguousarray(f.T)
for qi in nb.prange(nq):
acc=0
for ri in range(nr):
for rj in range(nr):
#Test carefully!
if q[qi]*r[ri,rj]!=0.:
acc += f_T[qi,ri]*f_T[qi,rj]*np.sin(q[qi]*r[ri,rj])/(q[qi]*r[ri,rj])
else:
acc += f_T[qi,ri]*f_T[qi,rj]
y[qi]=acc
return y
def numpy_func(r, q, f):
s = np.sinc(q * r[...,None]/np.pi)
return np.einsum('iq,jq,ijq->q',f,f,s)
小数组计时
r = np.random.random(size=(500,500))
q = np.linspace(0,1,501)
f = np.random.random(size=(r.shape[0],q.shape[0]))
%timeit y = myfunc(r, q, f)
#765 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt(r, q, f)
#158 ms ± 2.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt_2(r, q, f)
#51.5 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit y = numpy_func(r, q, f)
#3.81 s ± 61.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
print(np.allclose(numpy_func(r, q, f),myfunc(r, q, f)))
#True
print(np.allclose(numpy_func(r, q, f),myfunc_opt(r, q, f)))
#True
print(np.allclose(numpy_func(r, q, f),myfunc_opt_2(r, q, f)))
更大数组的计时
r = np.random.random(size=(1000,1000))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
%timeit y = myfunc(r, q, f)
#6.1 s ± 4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt(r, q, f)
#1.26 s ± 18.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit y = myfunc_opt_2(r, q, f)
#397 ms ± 2.69 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)