Cython 加速没有预期的那么大
Cython speedup isn't as large as expected
我写了一个 Python 函数来计算大量 (N ~ 10^3) 粒子之间的成对电磁相互作用,并将结果存储在 NxN complex128 ndarray 中。它运行,但它是较大程序中最慢的部分,当 N=900 [更正] 时大约需要 40 秒。原始代码如下所示:
import numpy as np
def interaction(s,alpha,kprop): # s is an Nx3 real array
# alpha is complex
# kprop is float
ndipoles = s.shape[0]
Amat = np.zeros((ndipoles,3, ndipoles, 3), dtype=np.complex128)
I = np.array([[1,0,0],[0,1,0],[0,0,1]])
im = complex(0,1)
k2 = kprop*kprop
for i in range(ndipoles):
xi = s[i,:]
for j in range(ndipoles):
if i != j:
xj = s[j,:]
dx = xi-xj
R = np.sqrt(dx.dot(dx))
n = dx/R
kR = kprop*R
kR2 = kR*kR
A = ((1./kR2) - im/kR)
nxn = np.outer(n, n)
nxn = (3*A-1)*nxn + (1-A)*I
nxn *= -alpha*(k2*np.exp(im*kR))/R
else:
nxn = I
Amat[i,:,j,:] = nxn
return(Amat.reshape((3*ndipoles,3*ndipoles)))
我以前从未使用过 Cython,但这似乎是我努力加快速度的好起点,所以我几乎盲目地采用了我在在线教程中找到的技术。我得到了一些加速(30 秒对 40 秒),但远没有我预期的那么戏剧化,所以我想知道我是做错了什么还是错过了关键步骤。以下是我对上述例程进行 cythonizing 的最佳尝试:
import numpy as np
cimport numpy as np
DTYPE = np.complex128
ctypedef np.complex128_t DTYPE_t
def interaction(np.ndarray s, DTYPE_t alpha, float kprop):
cdef float k2 = kprop*kprop
cdef int i,j
cdef np.ndarray xi, xj, dx, n, nxn
cdef float R, kR, kR2
cdef DTYPE_t A
cdef int ndipoles = s.shape[0]
cdef np.ndarray Amat = np.zeros((ndipoles,3, ndipoles, 3), dtype=DTYPE)
cdef np.ndarray I = np.array([[1,0,0],[0,1,0],[0,0,1]])
cdef DTYPE_t im = complex(0,1)
for i in range(ndipoles):
xi = s[i,:]
for j in range(ndipoles):
if i != j:
xj = s[j,:]
dx = xi-xj
R = np.sqrt(dx.dot(dx))
n = dx/R
kR = kprop*R
kR2 = kR*kR
A = ((1./kR2) - im/kR)
nxn = np.outer(n, n)
nxn = (3*A-1)*nxn + (1-A)*I
nxn *= -alpha*(k2*np.exp(im*kR))/R
else:
nxn = I
Amat[i,:,j,:] = nxn
return(Amat.reshape((3*ndipoles,3*ndipoles)))
NumPy 的真正强大之处在于以矢量化方式对大量元素执行操作,而不是在分布在循环中的块中使用该操作。在您的例子中,您使用了两个嵌套循环和一个 IF 条件语句。我会建议扩展中间数组的维度,这将使 NumPy's powerful broadcasting capability
发挥作用,因此可以一次性对所有元素使用相同的操作,而不是循环中的小块数据。
为了扩展维度,可以使用None
/np.newaxis
。因此,遵循这样一个前提的矢量化实现看起来像这样 -
def vectorized_interaction(s,alpha,kprop):
im = complex(0,1)
I = np.array([[1,0,0],[0,1,0],[0,0,1]])
k2 = kprop*kprop
# Vectorized calculations for dx, R, n, kR, A
sd = s[:,None] - s
Rv = np.sqrt((sd**2).sum(2))
nv = sd/Rv[:,:,None]
kRv = Rv*kprop
Av = (1./(kRv*kRv)) - im/kRv
# Vectorized calculation for: "nxn = np.outer(n, n)"
nxnv = nv[:,:,:,None]*nv[:,:,None,:]
# Vectorized calculation for: "(3*A-1)*nxn + (1-A)*I"
P = (3*Av[:,:,None,None]-1)*nxnv + (1-Av[:,:,None,None])*I
# Vectorized calculation for: "-alpha*(k2*np.exp(im*kR))/R"
multv = -alpha*(k2*np.exp(im*kRv))/Rv
# Vectorized calculation for: "nxn *= -alpha*(k2*np.exp(im*kR))/R"
outv = P*multv[:,:,None,None]
# Simulate ELSE part of the conditional statement"if i != j:"
# with masked setting to I on the last two dimensions
outv[np.eye((N),dtype=bool)] = I
return outv.transpose(0,2,1,3).reshape(N*3,-1)
运行时测试和输出验证 -
案例 #1:
In [703]: N = 10
...: s = np.random.rand(N,3) + complex(0,1)*np.random.rand(N,3)
...: alpha = 3j
...: kprop = 5.4
...:
In [704]: out_org = interaction(s,alpha,kprop)
...: out_vect = vectorized_interaction(s,alpha,kprop)
...: print np.allclose(np.real(out_org),np.real(out_vect))
...: print np.allclose(np.imag(out_org),np.imag(out_vect))
...:
True
True
In [705]: %timeit interaction(s,alpha,kprop)
100 loops, best of 3: 7.6 ms per loop
In [706]: %timeit vectorized_interaction(s,alpha,kprop)
1000 loops, best of 3: 304 µs per loop
案例 #2:
In [707]: N = 100
...: s = np.random.rand(N,3) + complex(0,1)*np.random.rand(N,3)
...: alpha = 3j
...: kprop = 5.4
...:
In [708]: out_org = interaction(s,alpha,kprop)
...: out_vect = vectorized_interaction(s,alpha,kprop)
...: print np.allclose(np.real(out_org),np.real(out_vect))
...: print np.allclose(np.imag(out_org),np.imag(out_vect))
...:
True
True
In [709]: %timeit interaction(s,alpha,kprop)
1 loops, best of 3: 826 ms per loop
In [710]: %timeit vectorized_interaction(s,alpha,kprop)
100 loops, best of 3: 14 ms per loop
案例 #3:
In [711]: N = 900
...: s = np.random.rand(N,3) + complex(0,1)*np.random.rand(N,3)
...: alpha = 3j
...: kprop = 5.4
...:
In [712]: out_org = interaction(s,alpha,kprop)
...: out_vect = vectorized_interaction(s,alpha,kprop)
...: print np.allclose(np.real(out_org),np.real(out_vect))
...: print np.allclose(np.imag(out_org),np.imag(out_vect))
...:
True
True
In [713]: %timeit interaction(s,alpha,kprop)
1 loops, best of 3: 1min 7s per loop
In [714]: %timeit vectorized_interaction(s,alpha,kprop)
1 loops, best of 3: 1.59 s per loop
我写了一个 Python 函数来计算大量 (N ~ 10^3) 粒子之间的成对电磁相互作用,并将结果存储在 NxN complex128 ndarray 中。它运行,但它是较大程序中最慢的部分,当 N=900 [更正] 时大约需要 40 秒。原始代码如下所示:
import numpy as np
def interaction(s,alpha,kprop): # s is an Nx3 real array
# alpha is complex
# kprop is float
ndipoles = s.shape[0]
Amat = np.zeros((ndipoles,3, ndipoles, 3), dtype=np.complex128)
I = np.array([[1,0,0],[0,1,0],[0,0,1]])
im = complex(0,1)
k2 = kprop*kprop
for i in range(ndipoles):
xi = s[i,:]
for j in range(ndipoles):
if i != j:
xj = s[j,:]
dx = xi-xj
R = np.sqrt(dx.dot(dx))
n = dx/R
kR = kprop*R
kR2 = kR*kR
A = ((1./kR2) - im/kR)
nxn = np.outer(n, n)
nxn = (3*A-1)*nxn + (1-A)*I
nxn *= -alpha*(k2*np.exp(im*kR))/R
else:
nxn = I
Amat[i,:,j,:] = nxn
return(Amat.reshape((3*ndipoles,3*ndipoles)))
我以前从未使用过 Cython,但这似乎是我努力加快速度的好起点,所以我几乎盲目地采用了我在在线教程中找到的技术。我得到了一些加速(30 秒对 40 秒),但远没有我预期的那么戏剧化,所以我想知道我是做错了什么还是错过了关键步骤。以下是我对上述例程进行 cythonizing 的最佳尝试:
import numpy as np
cimport numpy as np
DTYPE = np.complex128
ctypedef np.complex128_t DTYPE_t
def interaction(np.ndarray s, DTYPE_t alpha, float kprop):
cdef float k2 = kprop*kprop
cdef int i,j
cdef np.ndarray xi, xj, dx, n, nxn
cdef float R, kR, kR2
cdef DTYPE_t A
cdef int ndipoles = s.shape[0]
cdef np.ndarray Amat = np.zeros((ndipoles,3, ndipoles, 3), dtype=DTYPE)
cdef np.ndarray I = np.array([[1,0,0],[0,1,0],[0,0,1]])
cdef DTYPE_t im = complex(0,1)
for i in range(ndipoles):
xi = s[i,:]
for j in range(ndipoles):
if i != j:
xj = s[j,:]
dx = xi-xj
R = np.sqrt(dx.dot(dx))
n = dx/R
kR = kprop*R
kR2 = kR*kR
A = ((1./kR2) - im/kR)
nxn = np.outer(n, n)
nxn = (3*A-1)*nxn + (1-A)*I
nxn *= -alpha*(k2*np.exp(im*kR))/R
else:
nxn = I
Amat[i,:,j,:] = nxn
return(Amat.reshape((3*ndipoles,3*ndipoles)))
NumPy 的真正强大之处在于以矢量化方式对大量元素执行操作,而不是在分布在循环中的块中使用该操作。在您的例子中,您使用了两个嵌套循环和一个 IF 条件语句。我会建议扩展中间数组的维度,这将使 NumPy's powerful broadcasting capability
发挥作用,因此可以一次性对所有元素使用相同的操作,而不是循环中的小块数据。
为了扩展维度,可以使用None
/np.newaxis
。因此,遵循这样一个前提的矢量化实现看起来像这样 -
def vectorized_interaction(s,alpha,kprop):
im = complex(0,1)
I = np.array([[1,0,0],[0,1,0],[0,0,1]])
k2 = kprop*kprop
# Vectorized calculations for dx, R, n, kR, A
sd = s[:,None] - s
Rv = np.sqrt((sd**2).sum(2))
nv = sd/Rv[:,:,None]
kRv = Rv*kprop
Av = (1./(kRv*kRv)) - im/kRv
# Vectorized calculation for: "nxn = np.outer(n, n)"
nxnv = nv[:,:,:,None]*nv[:,:,None,:]
# Vectorized calculation for: "(3*A-1)*nxn + (1-A)*I"
P = (3*Av[:,:,None,None]-1)*nxnv + (1-Av[:,:,None,None])*I
# Vectorized calculation for: "-alpha*(k2*np.exp(im*kR))/R"
multv = -alpha*(k2*np.exp(im*kRv))/Rv
# Vectorized calculation for: "nxn *= -alpha*(k2*np.exp(im*kR))/R"
outv = P*multv[:,:,None,None]
# Simulate ELSE part of the conditional statement"if i != j:"
# with masked setting to I on the last two dimensions
outv[np.eye((N),dtype=bool)] = I
return outv.transpose(0,2,1,3).reshape(N*3,-1)
运行时测试和输出验证 -
案例 #1:
In [703]: N = 10
...: s = np.random.rand(N,3) + complex(0,1)*np.random.rand(N,3)
...: alpha = 3j
...: kprop = 5.4
...:
In [704]: out_org = interaction(s,alpha,kprop)
...: out_vect = vectorized_interaction(s,alpha,kprop)
...: print np.allclose(np.real(out_org),np.real(out_vect))
...: print np.allclose(np.imag(out_org),np.imag(out_vect))
...:
True
True
In [705]: %timeit interaction(s,alpha,kprop)
100 loops, best of 3: 7.6 ms per loop
In [706]: %timeit vectorized_interaction(s,alpha,kprop)
1000 loops, best of 3: 304 µs per loop
案例 #2:
In [707]: N = 100
...: s = np.random.rand(N,3) + complex(0,1)*np.random.rand(N,3)
...: alpha = 3j
...: kprop = 5.4
...:
In [708]: out_org = interaction(s,alpha,kprop)
...: out_vect = vectorized_interaction(s,alpha,kprop)
...: print np.allclose(np.real(out_org),np.real(out_vect))
...: print np.allclose(np.imag(out_org),np.imag(out_vect))
...:
True
True
In [709]: %timeit interaction(s,alpha,kprop)
1 loops, best of 3: 826 ms per loop
In [710]: %timeit vectorized_interaction(s,alpha,kprop)
100 loops, best of 3: 14 ms per loop
案例 #3:
In [711]: N = 900
...: s = np.random.rand(N,3) + complex(0,1)*np.random.rand(N,3)
...: alpha = 3j
...: kprop = 5.4
...:
In [712]: out_org = interaction(s,alpha,kprop)
...: out_vect = vectorized_interaction(s,alpha,kprop)
...: print np.allclose(np.real(out_org),np.real(out_vect))
...: print np.allclose(np.imag(out_org),np.imag(out_vect))
...:
True
True
In [713]: %timeit interaction(s,alpha,kprop)
1 loops, best of 3: 1min 7s per loop
In [714]: %timeit vectorized_interaction(s,alpha,kprop)
1 loops, best of 3: 1.59 s per loop