向量化滑动 Window 点积
Vectorize Sliding Window Dot Product
我有两个大向量(等长),我正在计算滑动 window 点积:
import numpy as np
a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66])
out = np.array(
[[a[0]*b[0]+a[1]*b[1]+a[2]*b[2]],
[a[1]*b[1]+a[2]*b[2]+a[3]*b[3]],
[a[2]*b[2]+a[3]*b[3]+a[4]*b[4]],
[a[3]*b[3]+a[4]*b[4]+a[5]*b[5]],
])
[[154]
[319]
[550]
[847]]
当然,我可以调用点积函数,但是如果 window/vector 长度很大,那么它的效率不如下面的代码:
window = 3
result = np.empty([4,1])
result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
for i in range(3):
result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
[[154]
[319]
[550]
[847]]
在这里,我们利用了 i+1th
点积与 ith
点积相似的事实。也就是说,
result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
如何将我的 for 循环转换为矢量化函数,以便计算可以利用来自 ith
步骤的信息,从而减少计算冗余,同时最大限度地减少所需的内存量。
更新
我实际需要:
import numpy as np
a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66, 77, 88])
out = np.array(
[a[0]*b[0]+a[1]*b[1]+a[2]*b[2]+a[3]*b[3]]+a[4]*b[4]]+a[5]*b[5],
a[0]*b[1]+a[1]*b[2]+a[2]*b[3]+a[3]*b[4]]+a[4]*b[5]]+a[5]*b[6],
a[0]*b[2]+a[1]*b[3]+a[2]*b[4]+a[3]*b[5]]+a[4]*b[6]]+a[5]*b[7],
])
[1001
1232
1463]
因此 a
将滑过 b
并计算点积。
方法 #1
在两个输入之间的逐元素乘法上使用 np.convolve
,并使用全 1 和 size=3
-
的内核
np.convolve(a*b,np.ones(3),'valid')
方法 #2
因为我们只是对window中的元素求和,所以我们也可以使用uniform_filter
,像这样-
from scipy.ndimage.filters import uniform_filter1d as unif1d
def uniform_filter(a,W):
hW = (W-1)//2
return W*unif1d(a.astype(float),size=W, mode='constant')[hW:-hW]
out = uniform_filter(a*b,W=3)
基准测试
循环方法 -
def loopy_approach(a,b):
window = 3
N = a.size-window+1
result = np.empty([N,1])
result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
for i in range(N-1):
result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
return result
时间和验证 -
In [147]: a = np.random.randint(0,100,(1000))
...: b = np.random.randint(0,100,(1000))
...:
In [148]: out0 = loopy_approach(a,b).ravel()
...: out1 = np.convolve(a*b,np.ones(3),'valid')
...: out2 = uniform_filter(a*b,W=3)
...:
In [149]: np.allclose(out0,out1)
Out[149]: True
In [150]: np.allclose(out0,out2)
Out[150]: True
In [151]: %timeit loopy_approach(a,b)
...: %timeit np.convolve(a*b,np.ones(3),'valid')
...: %timeit uniform_filter(a*b,W=3)
...:
100 loops, best of 3: 2.27 ms per loop
100000 loops, best of 3: 7 µs per loop
100000 loops, best of 3: 10.2 µs per loop
您可以使用 O(n) 复杂度的部分和:
ps = np.r_[0, np.cumsum(a*b)]
ps[3:]-ps[:-3]
# array([154, 319, 550, 847])
或更接近原始 for
循环并避免非常大的部分和的变体:
k = 3
d = a*b
d[k:] -= d[:-k].copy()
np.cumsum(d)[k-1:]
# array([154, 319, 550, 847])
Update 以匹配更新后的 Q.
这现在确实是一个卷积,所以@Divakar 的解决方案或多或少适用。只是,你会直接对 a[::-1]
和 b
进行卷积。如果速度有问题,您可以尝试将 np.convolve
替换为 scipy.signal.fftconvolve
,这取决于您的操作数的大小可能会快得多。但是,对于非常小的操作数或长度差异很大的操作数,您甚至可能会损失一些速度,因此请务必尝试两种方法:
np.convolve(b, a[::-1], 'valid')
scipy.signal.fftconvolve(b, a[::-1], 'valid')
另一种使用 strides 的方法是:
In [12]: from numpy.lib.stride_tricks import as_strided
In [13]: def using_strides(a, b, w=3):
shape = a.shape[:-1] + (a.shape[-1] - w + 1, w)
strides = a.strides + (a.strides[-1],)
res = np.sum((as_strided(a, shape=shape, strides=strides) * \
as_strided(b, shape=shape, strides=strides)), axis=1)
return res[:, np.newaxis]
In [14]: using_strides(a, b, 3)
Out[14]:
array([[154],
[319],
[550],
[847]])
我有两个大向量(等长),我正在计算滑动 window 点积:
import numpy as np
a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66])
out = np.array(
[[a[0]*b[0]+a[1]*b[1]+a[2]*b[2]],
[a[1]*b[1]+a[2]*b[2]+a[3]*b[3]],
[a[2]*b[2]+a[3]*b[3]+a[4]*b[4]],
[a[3]*b[3]+a[4]*b[4]+a[5]*b[5]],
])
[[154]
[319]
[550]
[847]]
当然,我可以调用点积函数,但是如果 window/vector 长度很大,那么它的效率不如下面的代码:
window = 3
result = np.empty([4,1])
result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
for i in range(3):
result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
[[154]
[319]
[550]
[847]]
在这里,我们利用了 i+1th
点积与 ith
点积相似的事实。也就是说,
result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
如何将我的 for 循环转换为矢量化函数,以便计算可以利用来自 ith
步骤的信息,从而减少计算冗余,同时最大限度地减少所需的内存量。
更新
我实际需要:
import numpy as np
a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66, 77, 88])
out = np.array(
[a[0]*b[0]+a[1]*b[1]+a[2]*b[2]+a[3]*b[3]]+a[4]*b[4]]+a[5]*b[5],
a[0]*b[1]+a[1]*b[2]+a[2]*b[3]+a[3]*b[4]]+a[4]*b[5]]+a[5]*b[6],
a[0]*b[2]+a[1]*b[3]+a[2]*b[4]+a[3]*b[5]]+a[4]*b[6]]+a[5]*b[7],
])
[1001
1232
1463]
因此 a
将滑过 b
并计算点积。
方法 #1
在两个输入之间的逐元素乘法上使用 np.convolve
,并使用全 1 和 size=3
-
np.convolve(a*b,np.ones(3),'valid')
方法 #2
因为我们只是对window中的元素求和,所以我们也可以使用uniform_filter
,像这样-
from scipy.ndimage.filters import uniform_filter1d as unif1d
def uniform_filter(a,W):
hW = (W-1)//2
return W*unif1d(a.astype(float),size=W, mode='constant')[hW:-hW]
out = uniform_filter(a*b,W=3)
基准测试
循环方法 -
def loopy_approach(a,b):
window = 3
N = a.size-window+1
result = np.empty([N,1])
result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
for i in range(N-1):
result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
return result
时间和验证 -
In [147]: a = np.random.randint(0,100,(1000))
...: b = np.random.randint(0,100,(1000))
...:
In [148]: out0 = loopy_approach(a,b).ravel()
...: out1 = np.convolve(a*b,np.ones(3),'valid')
...: out2 = uniform_filter(a*b,W=3)
...:
In [149]: np.allclose(out0,out1)
Out[149]: True
In [150]: np.allclose(out0,out2)
Out[150]: True
In [151]: %timeit loopy_approach(a,b)
...: %timeit np.convolve(a*b,np.ones(3),'valid')
...: %timeit uniform_filter(a*b,W=3)
...:
100 loops, best of 3: 2.27 ms per loop
100000 loops, best of 3: 7 µs per loop
100000 loops, best of 3: 10.2 µs per loop
您可以使用 O(n) 复杂度的部分和:
ps = np.r_[0, np.cumsum(a*b)]
ps[3:]-ps[:-3]
# array([154, 319, 550, 847])
或更接近原始 for
循环并避免非常大的部分和的变体:
k = 3
d = a*b
d[k:] -= d[:-k].copy()
np.cumsum(d)[k-1:]
# array([154, 319, 550, 847])
Update 以匹配更新后的 Q.
这现在确实是一个卷积,所以@Divakar 的解决方案或多或少适用。只是,你会直接对 a[::-1]
和 b
进行卷积。如果速度有问题,您可以尝试将 np.convolve
替换为 scipy.signal.fftconvolve
,这取决于您的操作数的大小可能会快得多。但是,对于非常小的操作数或长度差异很大的操作数,您甚至可能会损失一些速度,因此请务必尝试两种方法:
np.convolve(b, a[::-1], 'valid')
scipy.signal.fftconvolve(b, a[::-1], 'valid')
另一种使用 strides 的方法是:
In [12]: from numpy.lib.stride_tricks import as_strided
In [13]: def using_strides(a, b, w=3):
shape = a.shape[:-1] + (a.shape[-1] - w + 1, w)
strides = a.strides + (a.strides[-1],)
res = np.sum((as_strided(a, shape=shape, strides=strides) * \
as_strided(b, shape=shape, strides=strides)), axis=1)
return res[:, np.newaxis]
In [14]: using_strides(a, b, 3)
Out[14]:
array([[154],
[319],
[550],
[847]])