优化 python 中的双循环
Optimize Double loop in python
我正在尝试优化以下循环:
def numpy(nx, nz, c, rho):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum(c*rho[ix-1:ix+3, iz])
b[ix, iz] = sum(c*rho[ix-2:ix+2, iz])
return a, b
我尝试了不同的解决方案,发现使用 numba 计算乘积之和可以带来更好的性能:
import numpy as np
import numba as nb
import time
@nb.autojit
def sum_opt(arr1, arr2):
s = arr1[0]*arr2[0]
for i in range(1, len(arr1)):
s+=arr1[i]*arr2[i]
return s
def numba1(nx, nz, c, rho):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
@nb.autojit
def numba2(nx, nz, c, rho):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
nx = 1024
nz = 256
rho = np.random.rand(nx, nz)
c = np.random.rand(4)
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))
ti = time.clock()
a, b = numpy(nx, nz, c, rho)
print 'Time numpy : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
a, b = numba1(nx, nz, c, rho)
print 'Time numba1 : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
a, b = numba2(nx, nz, c, rho)
print 'Time numba2 : ' + `round(time.clock() - ti, 4)`
这导致
Time numpy : 4.1595
Time numba1 : 0.6993
Time numba2 : 1.0135
使用 numba 版本的求和函数 (sum_opt) 表现非常好。但我想知道为什么双循环函数 (numba2) 的 numba 版本会导致执行时间变慢。我尝试使用 jit 而不是 autojit,指定参数类型,但更糟。
我还注意到,首先在最小循环上循环比在最大循环上首先循环慢。有什么解释吗?
无论是,我确信这个双循环函数可以改进很多矢量化问题(如 this)或使用另一种方法(映射?),但我对这些方法有点困惑.
在我代码的其他部分,我使用了 numba 和 numpy 切片方法来替换所有显式循环,但在这种特殊情况下,我不知道如何设置它。
有什么想法吗?
编辑
感谢您的所有评论。我在这个问题上做了一些工作:
import numba as nb
import numpy as np
from scipy import signal
import time
@nb.jit(['float64(float64[:], float64[:])'], nopython=True)
def sum_opt(arr1, arr2):
s = arr1[0]*arr2[0]
for i in xrange(1, len(arr1)):
s+=arr1[i]*arr2[i]
return s
@nb.autojit
def numba1(nx, nz, c, rho, a, b):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
@nb.jit(nopython=True)
def numba2(nx, nz, c, rho, a, b):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
@nb.jit(['float64[:,:](int16, int16, float64[:], float64[:,:], float64[:,:])'], nopython=True)
def numba3a(nx, nz, c, rho, a):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
return a
@nb.jit(['float64[:,:](int16, int16, float64[:], float64[:,:], float64[:,:])'], nopython=True)
def numba3b(nx, nz, c, rho, b):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return b
def convol(nx, nz, c, aa, bb):
s1 = rho[1:nx-1,2:nz-3]
s2 = rho[0:nx-2,2:nz-3]
kernel = c[:,None][::-1]
aa[2:nx-3,2:nz-3] = signal.convolve2d(s1, kernel, boundary='symm', mode='valid')
bb[2:nx-3,2:nz-3] = signal.convolve2d(s2, kernel, boundary='symm', mode='valid')
return aa, bb
nx = 1024
nz = 256
rho = np.random.rand(nx, nz)
c = np.random.rand(4)
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))
ti = time.clock()
for i in range(1000):
a, b = numba1(nx, nz, c, rho, a, b)
print 'Time numba1 : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
for i in range(1000):
a, b = numba2(nx, nz, c, rho, a, b)
print 'Time numba2 : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
for i in range(1000):
a = numba3a(nx, nz, c, rho, a)
b = numba3b(nx, nz, c, rho, b)
print 'Time numba3 : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
for i in range(1000):
a, b = convol(nx, nz, c, a, b)
print 'Time convol : ' + `round(time.clock() - ti, 4)`
您的解决方案非常优雅 Divakar,但我必须在我的代码中大量使用此功能。因此,对于 1000 次迭代,这导致
Time numba1 : 3.2487
Time numba2 : 3.7012
Time numba3 : 3.2088
Time convol : 22.7696
autojit
和 jit
非常接近。
但是,当使用 jit
时,指定所有参数类型似乎很重要。
我不知道当函数有多个输出时,是否有办法在 jit
装饰器中指定参数类型。有人吗?
目前我没有找到除使用 numba 之外的其他解决方案。欢迎新想法!
如 continuum 博客 performance section 中所述,autojit
即时编译,而 jit
提前编译:
Numba can compile just-in-time with the autojit decorator, or ahead of
time with the jit decorator
这意味着在许多情况下,autojit
意味着编译器可以对其编译的代码做出更有根据的猜测,然后进行优化。我知道提前进行即时编译听起来很矛盾,但是嘿。
But I am wondering why the numba version of the double loop function
(numba2) leads to slower execution times
Numba 不会提高任意函数调用的性能。虽然我不能肯定地说,但我的猜测是 JIT 编译的开销超过了这样做的好处(如果有任何好处的话)。
I also noticed that looping first on the smallest loop is slower than
looping first on the biggest loop. Is there any explanation ?
这可能是由于 cache miss。二维数组被分配为大小为 rows * columns
的连续内存块。获取到缓存的内容基于时间(最近使用过的内容)和空间(内存中与已使用内容接近的内容)局部性的组合,即被认为接下来要使用的内容。
首先遍历行时,您将按照数据在内存中出现的顺序进行遍历。当首先遍历列时,您每次 "skip" 内存中行的宽度,这使得您正在访问的内存位置不太可能被提取到缓存中。
2D array: [[1,2,3],[4,5,6],[7,8,9]]
In memory: 1 2 3 4 5 6 7 8 9
让我们假设一个过于简单、愚蠢的缓存获取算法,它获取 3 个后续内存位置。
迭代行优先:
In memory: 1 2 3 | 4 5 6 | 7 8 9
Accessed: 1 2 3 | 4 5 6 | 7 8 9
Cache miss: - - - | - - - | - - -
迭代列优先:
In memory: 1 2 3 | 4 5 6 | 7 8 9
Accessed: 1 4 7 | 2 5 8 | 3 6 9
Cache miss: - - - | x x x | x x x
您没有充分利用 numpy 的功能。 numpythonic 处理问题的方式类似于:
cs = np.zeros((nx+1, nz))
np.cumsum(c*rho, axis=0, out=cs[1:])
aa = cs[5:, 2:-3] - cs[1:-4, 2:-3]
bb = cs[4:-1, 2:-3] - cs[:-5, 2:-3]
aa
现在将占据 a
数组的中心非零部分:
>>> a[:5, :5]
array([[ 0. , 0. , 0. , 0. , 0. ],
[ 0. , 0. , 0. , 0. , 0. ],
[ 0. , 0. , 2.31296595, 2.15743042, 2.5853117 ],
[ 0. , 0. , 2.02697233, 2.83191016, 2.58819583],
[ 0. , 0. , 2.4086584 , 2.45175615, 2.19628507]])
>>>aa[:3, :3]
array([[ 2.31296595, 2.15743042, 2.5853117 ],
[ 2.02697233, 2.83191016, 2.58819583],
[ 2.4086584 , 2.45175615, 2.19628507]])
bb
和 b
也类似。
在我的系统上,使用您的示例输入,此代码的运行速度比您的 numpy
函数快 300 倍以上。根据您的时间安排,这将比 numba 快一到两个数量级。
你基本上是在那里执行二维卷积,只是你的内核没有像通常的 convolution
操作那样反转。
所以,基本上,我们需要做两件事来使用 signal.convolve2d
来解决我们的案例 -
- 将输入数组
rho
切片为 select 它的一部分,用于您的代码的原始循环版本。这将是卷积的输入数据。
- 反转内核,
c
并将其与切片数据一起提供给signal.convolve2d
。
请注意,这些是为了分别计算 a
和 b
。
这是实现 -
import numpy as np
from scipy import signal
# Slices for convolutions to get a and b respectively
s1 = rho[1:nx-1,2:nz-3]
s2 = rho[0:nx-2,2:nz-3]
kernel = c[:,None][::-1] # convolution kernel
# Setup output arrays and fill them with convolution results
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))
a[2:nx-3,2:nz-3] = signal.convolve2d(s1, kernel, boundary='symm', mode='valid')
b[2:nx-3,2:nz-3] = signal.convolve2d(s2, kernel, boundary='symm', mode='valid')
如果您不需要输出数组边界周围的额外零,您可以直接使用 signal.convolve2d
的输出,这必须进一步提高性能。
运行时测试
In [532]: %timeit loop_based(nx, nz, c, rho)
1 loops, best of 3: 1.52 s per loop
In [533]: %timeit numba1(nx, nz, c, rho)
1 loops, best of 3: 282 ms per loop
In [534]: %timeit numba2(nx, nz, c, rho)
1 loops, best of 3: 509 ms per loop
In [535]: %timeit conv_based(nx, nz, c, rho)
10 loops, best of 3: 15.5 ms per loop
因此,对于实际输入数据大小,所提出的基于卷积的方法比循环代码快 100x
,大约 20x
优于基于 numba
的最快方法 numba1
.
Numba 在 nopython
mode 中非常快,但是对于您的代码,它必须退回到 object
模式,这要慢得多。如果将 nopython=True
传递给 jit
装饰器,就会看到这种情况。
如果您将 a
和 b
作为参数传递,它会在 nopython
模式下编译(至少在 Numba 版本 0.18.2 中):
import numba as nb
@nb.jit(nopython=True)
def sum_opt(arr1, arr2):
s = arr1[0]*arr2[0]
for i in range(1, len(arr1)):
s+=arr1[i]*arr2[i]
return s
@nb.jit(nopython=True)
def numba2(nx, nz, c, rho, a, b):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
请注意,在 release notes 中提到 autojit
被弃用,取而代之的是 jit
。
显然你还不满意。那么基于stride_tricks
的解决方案怎么样?
from numpy.lib.stride_tricks import as_strided
def stridetrick_einsum(c, rho, out):
ws = len(c)
nx, nz = rho.shape
shape = (nx-ws+1, ws, nz)
strides = (rho.strides[0],) + rho.strides
rho_windowed = as_strided(rho, shape, strides)
np.einsum('j,ijk->ik', c, rho_windowed, out=out)
a = np.zeros((nx, nz))
stridetrick_einsum(c, rho[1:-1,2:-3], a[2:-3,2:-3])
b = np.zeros((nx, nz))
stridetrick_einsum(c, rho[0:-2,2:-3], b[2:-3,2:-3])
另外,由于a
和b
显然几乎完全相同,所以可以一次性计算出来,然后将值复制过来:
a = np.zeros((nx, nz))
stridetrick_einsum(c, rho[:-1,2:-3], a[1:-3,2:-3])
b = np.zeros((nx, nz))
b[2:-3,2:-3] = a[1:-4,2:-3]
a[1,2:-3] = 0.0
我正在尝试优化以下循环:
def numpy(nx, nz, c, rho):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum(c*rho[ix-1:ix+3, iz])
b[ix, iz] = sum(c*rho[ix-2:ix+2, iz])
return a, b
我尝试了不同的解决方案,发现使用 numba 计算乘积之和可以带来更好的性能:
import numpy as np
import numba as nb
import time
@nb.autojit
def sum_opt(arr1, arr2):
s = arr1[0]*arr2[0]
for i in range(1, len(arr1)):
s+=arr1[i]*arr2[i]
return s
def numba1(nx, nz, c, rho):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
@nb.autojit
def numba2(nx, nz, c, rho):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
nx = 1024
nz = 256
rho = np.random.rand(nx, nz)
c = np.random.rand(4)
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))
ti = time.clock()
a, b = numpy(nx, nz, c, rho)
print 'Time numpy : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
a, b = numba1(nx, nz, c, rho)
print 'Time numba1 : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
a, b = numba2(nx, nz, c, rho)
print 'Time numba2 : ' + `round(time.clock() - ti, 4)`
这导致
Time numpy : 4.1595
Time numba1 : 0.6993
Time numba2 : 1.0135
使用 numba 版本的求和函数 (sum_opt) 表现非常好。但我想知道为什么双循环函数 (numba2) 的 numba 版本会导致执行时间变慢。我尝试使用 jit 而不是 autojit,指定参数类型,但更糟。
我还注意到,首先在最小循环上循环比在最大循环上首先循环慢。有什么解释吗?
无论是,我确信这个双循环函数可以改进很多矢量化问题(如 this)或使用另一种方法(映射?),但我对这些方法有点困惑.
在我代码的其他部分,我使用了 numba 和 numpy 切片方法来替换所有显式循环,但在这种特殊情况下,我不知道如何设置它。
有什么想法吗?
编辑
感谢您的所有评论。我在这个问题上做了一些工作:
import numba as nb
import numpy as np
from scipy import signal
import time
@nb.jit(['float64(float64[:], float64[:])'], nopython=True)
def sum_opt(arr1, arr2):
s = arr1[0]*arr2[0]
for i in xrange(1, len(arr1)):
s+=arr1[i]*arr2[i]
return s
@nb.autojit
def numba1(nx, nz, c, rho, a, b):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
@nb.jit(nopython=True)
def numba2(nx, nz, c, rho, a, b):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
@nb.jit(['float64[:,:](int16, int16, float64[:], float64[:,:], float64[:,:])'], nopython=True)
def numba3a(nx, nz, c, rho, a):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
return a
@nb.jit(['float64[:,:](int16, int16, float64[:], float64[:,:], float64[:,:])'], nopython=True)
def numba3b(nx, nz, c, rho, b):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return b
def convol(nx, nz, c, aa, bb):
s1 = rho[1:nx-1,2:nz-3]
s2 = rho[0:nx-2,2:nz-3]
kernel = c[:,None][::-1]
aa[2:nx-3,2:nz-3] = signal.convolve2d(s1, kernel, boundary='symm', mode='valid')
bb[2:nx-3,2:nz-3] = signal.convolve2d(s2, kernel, boundary='symm', mode='valid')
return aa, bb
nx = 1024
nz = 256
rho = np.random.rand(nx, nz)
c = np.random.rand(4)
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))
ti = time.clock()
for i in range(1000):
a, b = numba1(nx, nz, c, rho, a, b)
print 'Time numba1 : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
for i in range(1000):
a, b = numba2(nx, nz, c, rho, a, b)
print 'Time numba2 : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
for i in range(1000):
a = numba3a(nx, nz, c, rho, a)
b = numba3b(nx, nz, c, rho, b)
print 'Time numba3 : ' + `round(time.clock() - ti, 4)`
ti = time.clock()
for i in range(1000):
a, b = convol(nx, nz, c, a, b)
print 'Time convol : ' + `round(time.clock() - ti, 4)`
您的解决方案非常优雅 Divakar,但我必须在我的代码中大量使用此功能。因此,对于 1000 次迭代,这导致
Time numba1 : 3.2487
Time numba2 : 3.7012
Time numba3 : 3.2088
Time convol : 22.7696
autojit
和 jit
非常接近。
但是,当使用 jit
时,指定所有参数类型似乎很重要。
我不知道当函数有多个输出时,是否有办法在 jit
装饰器中指定参数类型。有人吗?
目前我没有找到除使用 numba 之外的其他解决方案。欢迎新想法!
如 continuum 博客 performance section 中所述,autojit
即时编译,而 jit
提前编译:
Numba can compile just-in-time with the autojit decorator, or ahead of time with the jit decorator
这意味着在许多情况下,autojit
意味着编译器可以对其编译的代码做出更有根据的猜测,然后进行优化。我知道提前进行即时编译听起来很矛盾,但是嘿。
But I am wondering why the numba version of the double loop function (numba2) leads to slower execution times
Numba 不会提高任意函数调用的性能。虽然我不能肯定地说,但我的猜测是 JIT 编译的开销超过了这样做的好处(如果有任何好处的话)。
I also noticed that looping first on the smallest loop is slower than looping first on the biggest loop. Is there any explanation ?
这可能是由于 cache miss。二维数组被分配为大小为 rows * columns
的连续内存块。获取到缓存的内容基于时间(最近使用过的内容)和空间(内存中与已使用内容接近的内容)局部性的组合,即被认为接下来要使用的内容。
首先遍历行时,您将按照数据在内存中出现的顺序进行遍历。当首先遍历列时,您每次 "skip" 内存中行的宽度,这使得您正在访问的内存位置不太可能被提取到缓存中。
2D array: [[1,2,3],[4,5,6],[7,8,9]]
In memory: 1 2 3 4 5 6 7 8 9
让我们假设一个过于简单、愚蠢的缓存获取算法,它获取 3 个后续内存位置。
迭代行优先:
In memory: 1 2 3 | 4 5 6 | 7 8 9
Accessed: 1 2 3 | 4 5 6 | 7 8 9
Cache miss: - - - | - - - | - - -
迭代列优先:
In memory: 1 2 3 | 4 5 6 | 7 8 9
Accessed: 1 4 7 | 2 5 8 | 3 6 9
Cache miss: - - - | x x x | x x x
您没有充分利用 numpy 的功能。 numpythonic 处理问题的方式类似于:
cs = np.zeros((nx+1, nz))
np.cumsum(c*rho, axis=0, out=cs[1:])
aa = cs[5:, 2:-3] - cs[1:-4, 2:-3]
bb = cs[4:-1, 2:-3] - cs[:-5, 2:-3]
aa
现在将占据 a
数组的中心非零部分:
>>> a[:5, :5]
array([[ 0. , 0. , 0. , 0. , 0. ],
[ 0. , 0. , 0. , 0. , 0. ],
[ 0. , 0. , 2.31296595, 2.15743042, 2.5853117 ],
[ 0. , 0. , 2.02697233, 2.83191016, 2.58819583],
[ 0. , 0. , 2.4086584 , 2.45175615, 2.19628507]])
>>>aa[:3, :3]
array([[ 2.31296595, 2.15743042, 2.5853117 ],
[ 2.02697233, 2.83191016, 2.58819583],
[ 2.4086584 , 2.45175615, 2.19628507]])
bb
和 b
也类似。
在我的系统上,使用您的示例输入,此代码的运行速度比您的 numpy
函数快 300 倍以上。根据您的时间安排,这将比 numba 快一到两个数量级。
你基本上是在那里执行二维卷积,只是你的内核没有像通常的 convolution
操作那样反转。
所以,基本上,我们需要做两件事来使用 signal.convolve2d
来解决我们的案例 -
- 将输入数组
rho
切片为 select 它的一部分,用于您的代码的原始循环版本。这将是卷积的输入数据。 - 反转内核,
c
并将其与切片数据一起提供给signal.convolve2d
。
请注意,这些是为了分别计算 a
和 b
。
这是实现 -
import numpy as np
from scipy import signal
# Slices for convolutions to get a and b respectively
s1 = rho[1:nx-1,2:nz-3]
s2 = rho[0:nx-2,2:nz-3]
kernel = c[:,None][::-1] # convolution kernel
# Setup output arrays and fill them with convolution results
a = np.zeros((nx, nz))
b = np.zeros((nx, nz))
a[2:nx-3,2:nz-3] = signal.convolve2d(s1, kernel, boundary='symm', mode='valid')
b[2:nx-3,2:nz-3] = signal.convolve2d(s2, kernel, boundary='symm', mode='valid')
如果您不需要输出数组边界周围的额外零,您可以直接使用 signal.convolve2d
的输出,这必须进一步提高性能。
运行时测试
In [532]: %timeit loop_based(nx, nz, c, rho)
1 loops, best of 3: 1.52 s per loop
In [533]: %timeit numba1(nx, nz, c, rho)
1 loops, best of 3: 282 ms per loop
In [534]: %timeit numba2(nx, nz, c, rho)
1 loops, best of 3: 509 ms per loop
In [535]: %timeit conv_based(nx, nz, c, rho)
10 loops, best of 3: 15.5 ms per loop
因此,对于实际输入数据大小,所提出的基于卷积的方法比循环代码快 100x
,大约 20x
优于基于 numba
的最快方法 numba1
.
Numba 在 nopython
mode 中非常快,但是对于您的代码,它必须退回到 object
模式,这要慢得多。如果将 nopython=True
传递给 jit
装饰器,就会看到这种情况。
如果您将 a
和 b
作为参数传递,它会在 nopython
模式下编译(至少在 Numba 版本 0.18.2 中):
import numba as nb
@nb.jit(nopython=True)
def sum_opt(arr1, arr2):
s = arr1[0]*arr2[0]
for i in range(1, len(arr1)):
s+=arr1[i]*arr2[i]
return s
@nb.jit(nopython=True)
def numba2(nx, nz, c, rho, a, b):
for ix in range(2, nx-3):
for iz in range(2, nz-3):
a[ix, iz] = sum_opt(c, rho[ix-1:ix+3, iz])
b[ix, iz] = sum_opt(c, rho[ix-2:ix+2, iz])
return a, b
请注意,在 release notes 中提到 autojit
被弃用,取而代之的是 jit
。
显然你还不满意。那么基于stride_tricks
的解决方案怎么样?
from numpy.lib.stride_tricks import as_strided
def stridetrick_einsum(c, rho, out):
ws = len(c)
nx, nz = rho.shape
shape = (nx-ws+1, ws, nz)
strides = (rho.strides[0],) + rho.strides
rho_windowed = as_strided(rho, shape, strides)
np.einsum('j,ijk->ik', c, rho_windowed, out=out)
a = np.zeros((nx, nz))
stridetrick_einsum(c, rho[1:-1,2:-3], a[2:-3,2:-3])
b = np.zeros((nx, nz))
stridetrick_einsum(c, rho[0:-2,2:-3], b[2:-3,2:-3])
另外,由于a
和b
显然几乎完全相同,所以可以一次性计算出来,然后将值复制过来:
a = np.zeros((nx, nz))
stridetrick_einsum(c, rho[:-1,2:-3], a[1:-3,2:-3])
b = np.zeros((nx, nz))
b[2:-3,2:-3] = a[1:-4,2:-3]
a[1,2:-3] = 0.0