在 numpy 中向量化二维移动 window,包括边
Vectorized 2-D moving window in numpy including edges
我意识到我的问题与 Vectorized moving window on 2D array in numpy 非常相似
,但那里的答案并不能完全满足我的需求。
是否可以进行包含所谓边缘效应的矢量化二维移动 window(滚动 window)?最有效的方法是什么?
也就是说,我想在我的网格上滑动移动 window 的中心,这样中心就可以移动到网格中的每个单元格上。当沿着网格的边缘移动时,此操作将 return 仅 window 与网格重叠的部分。 window 完全在网格内,完整的 window 是 returned。例如,如果我有网格:
array([[1,2,3,4],
[2,3,4,5],
[3,4,5,6],
[4,5,6,7]])
…我想使用以该点为中心的 3x3
window 对该网格中的每个点进行采样,操作应该 return 一系列数组,或者,理想情况下,将一系列视图放入原数组中,如下:
array([[1,2], array([[1,2,3], array([[2,3,4], array([[3,4],
[2,3]]) [2,3,4]]) [3,4,5]]) [4,5]])
array([[1,2], array([[1,2,3], array([[2,3,4], array([[3,4],
[2,3], [2,3,4], [3,4,5], [4,5],
[3,4]]) [3,4,5]]) [4,5,6]]) [5,6]])
array([[2,3], array([[2,3,4], array([[3,4,5], array([[4,5],
[3,4], [3,4,5], [4,5,6], [5,6],
[4,5]]) [4,5,6]]) [5,6,7]]) [6,7]])
array([[3,4], array([[3,4,5], array([[4,5,6], array([[5,6],
[4,5]]) [4,5,6]]) [5,6,7]]) [6,7]])
因为我需要多次执行此操作,所以速度很重要,理想的解决方案是矢量化操作。
这不是您问题的严格答案,因为它没有矢量化,但希望它是任何其他潜在解决方案的有用基准(图像处理库中肯定有一些东西?)
无论如何,我已经将 window 实现为一个循环,它将 window 的平均值与输出到一个新数组中。输入是一个数组和 window 大小 +/- 当前索引。一个版本直接使用 Python 和 Numpy,另一个使用 numba 编译。
def mw_mean(in_arr,out_arr,x_win,y_win):
xn,yn = in_arr.shape
for x in range(xn):
xmin = max([0,x - x_win])
xmax = min([xn, x + x_win + 1])
for y in range(yn):
ymin = max([0,y - y_win])
ymax = min([yn, y + y_win + 1])
out_arr[x,y] = in_arr[xmin:xmax, ymin:ymax].mean()
return out_arr
@jit("i4[:,:](i4[:,:],i4[:,:],i4,i4)", nopython = True)
def mw_mean_numba(in_arr,out_arr,x_win,y_win):
xn,yn = in_arr.shape
for x in range(xn):
xmin = max(0,x - x_win)
xmax = min(xn, x + x_win + 1)
for y in range(yn):
ymin = max(0,y - y_win)
ymax = min(yn, y + y_win + 1)
out_arr[x,y] = in_arr[xmin:xmax, ymin:ymax].mean()
return out_arr
这是针对三种不同的数组大小进行测试的——您的原始测试用例和两个较大的测试用例(100x100 和 1000x1000):
a = np.array([[1,2,3,4], [2,3,4,5], [3,4,5,6], [4,5,6,7]])
b = np.random.randint(1,7, size = (100,100))
c = np.random.randint(1,7, size = (1000,1000))
aout,bout,cout = np.zeros_like(a),np.zeros_like(b),np.zeros_like(c)
x_win = 1
y_win = 1
没有编译的运行时间:
%timeit mw_mean(a,aout,x_win,y_win)
1000 loops, best of 3: 225 µs per loop
%timeit mw_mean(b,bout,x_win,y_win)
10 loops, best of 3: 137 ms per loop
%timeit mw_mean(c,cout,x_win,y_win)
1 loop, best of 3: 14.1 s per loop
编译运行时间:
%timeit mw_mean_numba(a,aout,x_win,y_win)
1000000 loops, best of 3: 1.22 µs per loop
%timeit mw_mean_numba(b,bout,x_win,y_win)
1000 loops, best of 3: 550 µs per loop
%timeit mw_mean_numba(c,cout,x_win,y_win)
10 loops, best of 3: 55.1 ms per loop
编辑:之前的版本是在原地修改数组,这显然是滚动 window 的大禁忌。基准保持不变。
您可以定义一个生成生成器的函数并使用它。 window 将是你想要的形状的底部除以 2,技巧就是在你沿着行和列移动时沿着 window 索引数组。
def window(arr, shape=(3, 3)):
# Find row and column window sizes
r_win = np.floor(shape[0] / 2).astype(int)
c_win = np.floor(shape[1] / 2).astype(int)
x, y = arr.shape
for i in range(x):
xmin = max(0, i - r_win)
xmax = min(x, i + r_win + 1)
for j in range(y):
ymin = max(0, j - c_win)
ymax = min(y, j + c_win + 1)
yield arr[xmin:xmax, ymin:ymax]
你可以像这样使用这个函数:
arr = np.array([[1,2,3,4],
[2,3,4,5],
[3,4,5,6],
[4,5,6,7]])
gen = window(arr)
next(gen)
array([[1, 2],
[2, 3]])
通过生成器生成示例中的所有 windows。
它没有矢量化,但我不确定是否存在 returns 不同大小数组的矢量化函数。正如@PaulPanzer 指出的那样,您可以将数组填充到您需要的大小,并使用 np.lib.stride_tricks.as_strided
生成切片视图。像这样:
def rolling_window(a, shape):
s = (a.shape[0] - shape[0] + 1,) + (a.shape[1] - shape[1] + 1,) + shape
strides = a.strides + a.strides
return np.lib.stride_tricks.as_strided(a, shape=s, strides=strides)
def window2(arr, shape=(3, 3)):
r_extra = np.floor(shape[0] / 2).astype(int)
c_extra = np.floor(shape[1] / 2).astype(int)
out = np.empty((arr.shape[0] + 2 * r_extra, arr.shape[1] + 2 * c_extra))
out[:] = np.nan
out[r_extra:-r_extra, c_extra:-c_extra] = arr
view = rolling_window(out, shape)
return view
window2(arr, (3,3))
array([[[[ nan, nan, nan],
[ nan, 1., 2.],
[ nan, 2., 3.]],
[[ nan, nan, nan],
[ 1., 2., 3.],
[ 2., 3., 4.]],
[[ nan, nan, nan],
[ 2., 3., 4.],
[ 3., 4., 5.]],
[[ nan, nan, nan],
[ 3., 4., nan],
[ 4., 5., nan]]],
[[[ nan, 1., 2.],
[ nan, 2., 3.],
[ nan, 3., 4.]],
[[ 1., 2., 3.],
[ 2., 3., 4.],
[ 3., 4., 5.]],
[[ 2., 3., 4.],
[ 3., 4., 5.],
[ 4., 5., 6.]],
[[ 3., 4., nan],
[ 4., 5., nan],
[ 5., 6., nan]]],
[[[ nan, 2., 3.],
[ nan, 3., 4.],
[ nan, 4., 5.]],
[[ 2., 3., 4.],
[ 3., 4., 5.],
[ 4., 5., 6.]],
[[ 3., 4., 5.],
[ 4., 5., 6.],
[ 5., 6., 7.]],
[[ 4., 5., nan],
[ 5., 6., nan],
[ 6., 7., nan]]],
[[[ nan, 3., 4.],
[ nan, 4., 5.],
[ nan, nan, nan]],
[[ 3., 4., 5.],
[ 4., 5., 6.],
[ nan, nan, nan]],
[[ 4., 5., 6.],
[ 5., 6., 7.],
[ nan, nan, nan]],
[[ 5., 6., nan],
[ 6., 7., nan],
[ nan, nan, nan]]]])
此版本用 np.nan
填充边缘以避免与数组中的任何其他值混淆。给定的数组比 window
函数快大约 3 倍,但我不确定填充输出将如何影响你想在下游做的任何事情。
我意识到我的问题与 Vectorized moving window on 2D array in numpy 非常相似 ,但那里的答案并不能完全满足我的需求。
是否可以进行包含所谓边缘效应的矢量化二维移动 window(滚动 window)?最有效的方法是什么?
也就是说,我想在我的网格上滑动移动 window 的中心,这样中心就可以移动到网格中的每个单元格上。当沿着网格的边缘移动时,此操作将 return 仅 window 与网格重叠的部分。 window 完全在网格内,完整的 window 是 returned。例如,如果我有网格:
array([[1,2,3,4],
[2,3,4,5],
[3,4,5,6],
[4,5,6,7]])
…我想使用以该点为中心的 3x3
window 对该网格中的每个点进行采样,操作应该 return 一系列数组,或者,理想情况下,将一系列视图放入原数组中,如下:
array([[1,2], array([[1,2,3], array([[2,3,4], array([[3,4],
[2,3]]) [2,3,4]]) [3,4,5]]) [4,5]])
array([[1,2], array([[1,2,3], array([[2,3,4], array([[3,4],
[2,3], [2,3,4], [3,4,5], [4,5],
[3,4]]) [3,4,5]]) [4,5,6]]) [5,6]])
array([[2,3], array([[2,3,4], array([[3,4,5], array([[4,5],
[3,4], [3,4,5], [4,5,6], [5,6],
[4,5]]) [4,5,6]]) [5,6,7]]) [6,7]])
array([[3,4], array([[3,4,5], array([[4,5,6], array([[5,6],
[4,5]]) [4,5,6]]) [5,6,7]]) [6,7]])
因为我需要多次执行此操作,所以速度很重要,理想的解决方案是矢量化操作。
这不是您问题的严格答案,因为它没有矢量化,但希望它是任何其他潜在解决方案的有用基准(图像处理库中肯定有一些东西?)
无论如何,我已经将 window 实现为一个循环,它将 window 的平均值与输出到一个新数组中。输入是一个数组和 window 大小 +/- 当前索引。一个版本直接使用 Python 和 Numpy,另一个使用 numba 编译。
def mw_mean(in_arr,out_arr,x_win,y_win):
xn,yn = in_arr.shape
for x in range(xn):
xmin = max([0,x - x_win])
xmax = min([xn, x + x_win + 1])
for y in range(yn):
ymin = max([0,y - y_win])
ymax = min([yn, y + y_win + 1])
out_arr[x,y] = in_arr[xmin:xmax, ymin:ymax].mean()
return out_arr
@jit("i4[:,:](i4[:,:],i4[:,:],i4,i4)", nopython = True)
def mw_mean_numba(in_arr,out_arr,x_win,y_win):
xn,yn = in_arr.shape
for x in range(xn):
xmin = max(0,x - x_win)
xmax = min(xn, x + x_win + 1)
for y in range(yn):
ymin = max(0,y - y_win)
ymax = min(yn, y + y_win + 1)
out_arr[x,y] = in_arr[xmin:xmax, ymin:ymax].mean()
return out_arr
这是针对三种不同的数组大小进行测试的——您的原始测试用例和两个较大的测试用例(100x100 和 1000x1000):
a = np.array([[1,2,3,4], [2,3,4,5], [3,4,5,6], [4,5,6,7]])
b = np.random.randint(1,7, size = (100,100))
c = np.random.randint(1,7, size = (1000,1000))
aout,bout,cout = np.zeros_like(a),np.zeros_like(b),np.zeros_like(c)
x_win = 1
y_win = 1
没有编译的运行时间:
%timeit mw_mean(a,aout,x_win,y_win)
1000 loops, best of 3: 225 µs per loop
%timeit mw_mean(b,bout,x_win,y_win)
10 loops, best of 3: 137 ms per loop
%timeit mw_mean(c,cout,x_win,y_win)
1 loop, best of 3: 14.1 s per loop
编译运行时间:
%timeit mw_mean_numba(a,aout,x_win,y_win)
1000000 loops, best of 3: 1.22 µs per loop
%timeit mw_mean_numba(b,bout,x_win,y_win)
1000 loops, best of 3: 550 µs per loop
%timeit mw_mean_numba(c,cout,x_win,y_win)
10 loops, best of 3: 55.1 ms per loop
编辑:之前的版本是在原地修改数组,这显然是滚动 window 的大禁忌。基准保持不变。
您可以定义一个生成生成器的函数并使用它。 window 将是你想要的形状的底部除以 2,技巧就是在你沿着行和列移动时沿着 window 索引数组。
def window(arr, shape=(3, 3)):
# Find row and column window sizes
r_win = np.floor(shape[0] / 2).astype(int)
c_win = np.floor(shape[1] / 2).astype(int)
x, y = arr.shape
for i in range(x):
xmin = max(0, i - r_win)
xmax = min(x, i + r_win + 1)
for j in range(y):
ymin = max(0, j - c_win)
ymax = min(y, j + c_win + 1)
yield arr[xmin:xmax, ymin:ymax]
你可以像这样使用这个函数:
arr = np.array([[1,2,3,4],
[2,3,4,5],
[3,4,5,6],
[4,5,6,7]])
gen = window(arr)
next(gen)
array([[1, 2],
[2, 3]])
通过生成器生成示例中的所有 windows。
它没有矢量化,但我不确定是否存在 returns 不同大小数组的矢量化函数。正如@PaulPanzer 指出的那样,您可以将数组填充到您需要的大小,并使用 np.lib.stride_tricks.as_strided
生成切片视图。像这样:
def rolling_window(a, shape):
s = (a.shape[0] - shape[0] + 1,) + (a.shape[1] - shape[1] + 1,) + shape
strides = a.strides + a.strides
return np.lib.stride_tricks.as_strided(a, shape=s, strides=strides)
def window2(arr, shape=(3, 3)):
r_extra = np.floor(shape[0] / 2).astype(int)
c_extra = np.floor(shape[1] / 2).astype(int)
out = np.empty((arr.shape[0] + 2 * r_extra, arr.shape[1] + 2 * c_extra))
out[:] = np.nan
out[r_extra:-r_extra, c_extra:-c_extra] = arr
view = rolling_window(out, shape)
return view
window2(arr, (3,3))
array([[[[ nan, nan, nan],
[ nan, 1., 2.],
[ nan, 2., 3.]],
[[ nan, nan, nan],
[ 1., 2., 3.],
[ 2., 3., 4.]],
[[ nan, nan, nan],
[ 2., 3., 4.],
[ 3., 4., 5.]],
[[ nan, nan, nan],
[ 3., 4., nan],
[ 4., 5., nan]]],
[[[ nan, 1., 2.],
[ nan, 2., 3.],
[ nan, 3., 4.]],
[[ 1., 2., 3.],
[ 2., 3., 4.],
[ 3., 4., 5.]],
[[ 2., 3., 4.],
[ 3., 4., 5.],
[ 4., 5., 6.]],
[[ 3., 4., nan],
[ 4., 5., nan],
[ 5., 6., nan]]],
[[[ nan, 2., 3.],
[ nan, 3., 4.],
[ nan, 4., 5.]],
[[ 2., 3., 4.],
[ 3., 4., 5.],
[ 4., 5., 6.]],
[[ 3., 4., 5.],
[ 4., 5., 6.],
[ 5., 6., 7.]],
[[ 4., 5., nan],
[ 5., 6., nan],
[ 6., 7., nan]]],
[[[ nan, 3., 4.],
[ nan, 4., 5.],
[ nan, nan, nan]],
[[ 3., 4., 5.],
[ 4., 5., 6.],
[ nan, nan, nan]],
[[ 4., 5., 6.],
[ 5., 6., 7.],
[ nan, nan, nan]],
[[ 5., 6., nan],
[ 6., 7., nan],
[ nan, nan, nan]]]])
此版本用 np.nan
填充边缘以避免与数组中的任何其他值混淆。给定的数组比 window
函数快大约 3 倍,但我不确定填充输出将如何影响你想在下游做的任何事情。