Numpy Vectorization of sliding-window 操作
Numpy Vectorization of sliding-window operation
我有以下 numpy 数组:
arr_1 = [[1,2],[3,4],[5,6]] # 3 X 2
arr_2 = [[0.5,0.6],[0.7,0.8],[0.9,1.0],[1.1,1.2],[1.3,1.4]] # 5 X 2
arr_1
显然是一个 3 X 2
数组,而 arr_2
是一个 5 X 2
数组。
现在没有循环,我想按元素乘以 arr_1 和 arr_2 以便我将滑动 window 技术(window 大小 3)应用于 arr_2.
Example:
Multiplication 1: np.multiply(arr_1,arr_2[:3,:])
Multiplication 2: np.multiply(arr_1,arr_2[1:4,:])
Multiplication 3: np.multiply(arr_1,arr_2[2:5,:])
我想以某种矩阵乘法形式执行此操作,以使其比我当前的解决方案更快:
for i in (2):
np.multiply(arr_1,arr_2[i:i+3,:])
因此,如果 arr_2 中的行数很大(大约数万),则此解决方案的扩展性并不是很好。
如有任何帮助,我们将不胜感激。
我们可以使用 NumPy broadcasting
以矢量化方式创建那些滑动窗口索引。然后,我们可以简单地用这些索引到 arr_2
以创建一个 3D
数组并与 2D
数组 arr_1
执行逐元素乘法,这反过来会带来 broadcasting
再次。
所以,我们会有一个像这样的矢量化实现 -
W = arr_1.shape[0] # Window size
idx = np.arange(arr_2.shape[0]-W+1)[:,None] + np.arange(W)
out = arr_1*arr_2[idx]
运行时测试验证结果-
In [143]: # Input arrays
...: arr_1 = np.random.rand(3,2)
...: arr_2 = np.random.rand(10000,2)
...:
...: def org_app(arr_1,arr_2):
...: W = arr_1.shape[0] # Window size
...: L = arr_2.shape[0]-W+1
...: out = np.empty((L,W,arr_1.shape[1]))
...: for i in range(L):
...: out[i] = np.multiply(arr_1,arr_2[i:i+W,:])
...: return out
...:
...: def vectorized_app(arr_1,arr_2):
...: W = arr_1.shape[0] # Window size
...: idx = np.arange(arr_2.shape[0]-W+1)[:,None] + np.arange(W)
...: return arr_1*arr_2[idx]
...:
In [144]: np.allclose(org_app(arr_1,arr_2),vectorized_app(arr_1,arr_2))
Out[144]: True
In [145]: %timeit org_app(arr_1,arr_2)
10 loops, best of 3: 47.3 ms per loop
In [146]: %timeit vectorized_app(arr_1,arr_2)
1000 loops, best of 3: 1.21 ms per loop
这是一个很好的案例来测试as_strided
和Divakar的广播速度。
In [281]: %%timeit
...: out=np.empty((L,W,arr1.shape[1]))
...: for i in range(L):
...: out[i]=np.multiply(arr1,arr2[i:i+W,:])
...:
10 loops, best of 3: 48.9 ms per loop
In [282]: %%timeit
...: idx=np.arange(L)[:,None]+np.arange(W)
...: out=arr1*arr2[idx]
...:
100 loops, best of 3: 2.18 ms per loop
In [283]: %%timeit
...: arr3=as_strided(arr2, shape=(L,W,2), strides=(16,16,8))
...: out=arr1*arr3
...:
1000 loops, best of 3: 805 µs per loop
对这些方法进行更多比较。
我有以下 numpy 数组:
arr_1 = [[1,2],[3,4],[5,6]] # 3 X 2
arr_2 = [[0.5,0.6],[0.7,0.8],[0.9,1.0],[1.1,1.2],[1.3,1.4]] # 5 X 2
arr_1
显然是一个 3 X 2
数组,而 arr_2
是一个 5 X 2
数组。
现在没有循环,我想按元素乘以 arr_1 和 arr_2 以便我将滑动 window 技术(window 大小 3)应用于 arr_2.
Example:
Multiplication 1: np.multiply(arr_1,arr_2[:3,:])
Multiplication 2: np.multiply(arr_1,arr_2[1:4,:])
Multiplication 3: np.multiply(arr_1,arr_2[2:5,:])
我想以某种矩阵乘法形式执行此操作,以使其比我当前的解决方案更快:
for i in (2):
np.multiply(arr_1,arr_2[i:i+3,:])
因此,如果 arr_2 中的行数很大(大约数万),则此解决方案的扩展性并不是很好。
如有任何帮助,我们将不胜感激。
我们可以使用 NumPy broadcasting
以矢量化方式创建那些滑动窗口索引。然后,我们可以简单地用这些索引到 arr_2
以创建一个 3D
数组并与 2D
数组 arr_1
执行逐元素乘法,这反过来会带来 broadcasting
再次。
所以,我们会有一个像这样的矢量化实现 -
W = arr_1.shape[0] # Window size
idx = np.arange(arr_2.shape[0]-W+1)[:,None] + np.arange(W)
out = arr_1*arr_2[idx]
运行时测试验证结果-
In [143]: # Input arrays
...: arr_1 = np.random.rand(3,2)
...: arr_2 = np.random.rand(10000,2)
...:
...: def org_app(arr_1,arr_2):
...: W = arr_1.shape[0] # Window size
...: L = arr_2.shape[0]-W+1
...: out = np.empty((L,W,arr_1.shape[1]))
...: for i in range(L):
...: out[i] = np.multiply(arr_1,arr_2[i:i+W,:])
...: return out
...:
...: def vectorized_app(arr_1,arr_2):
...: W = arr_1.shape[0] # Window size
...: idx = np.arange(arr_2.shape[0]-W+1)[:,None] + np.arange(W)
...: return arr_1*arr_2[idx]
...:
In [144]: np.allclose(org_app(arr_1,arr_2),vectorized_app(arr_1,arr_2))
Out[144]: True
In [145]: %timeit org_app(arr_1,arr_2)
10 loops, best of 3: 47.3 ms per loop
In [146]: %timeit vectorized_app(arr_1,arr_2)
1000 loops, best of 3: 1.21 ms per loop
这是一个很好的案例来测试as_strided
和Divakar的广播速度。
In [281]: %%timeit
...: out=np.empty((L,W,arr1.shape[1]))
...: for i in range(L):
...: out[i]=np.multiply(arr1,arr2[i:i+W,:])
...:
10 loops, best of 3: 48.9 ms per loop
In [282]: %%timeit
...: idx=np.arange(L)[:,None]+np.arange(W)
...: out=arr1*arr2[idx]
...:
100 loops, best of 3: 2.18 ms per loop
In [283]: %%timeit
...: arr3=as_strided(arr2, shape=(L,W,2), strides=(16,16,8))
...: out=arr1*arr3
...:
1000 loops, best of 3: 805 µs per loop