在 numpy 中向量化二维移动 window,包括边

Vectorized 2-D moving window in numpy including edges

我意识到我的问题与 Vectorized moving window on 2D array in numpy 非常相似 ,但那里的答案并不能完全满足我的需求。

是否可以进行包含所谓边缘效应的矢量化二维移动 window(滚动 window)?最有效的方法是什么?

也就是说,我想在我的网格上滑动移动 window 的中心,这样中心就可以移动到网格中的每个单元格上。当沿着网格的边缘移动时,此操作将 return 仅 window 与网格重叠的部分。 window 完全在网格内,完整的 window 是 returned。例如,如果我有网格:

array([[1,2,3,4],
       [2,3,4,5],
       [3,4,5,6],
       [4,5,6,7]])

…我想使用以该点为中心的 3x3 window 对该网格中的每个点进行采样,操作应该 return 一系列数组,或者,理想情况下,将一系列视图放入原数组中,如下:

array([[1,2],    array([[1,2,3],    array([[2,3,4],    array([[3,4],
       [2,3]])          [2,3,4]])          [3,4,5]])          [4,5]])

array([[1,2],    array([[1,2,3],    array([[2,3,4],    array([[3,4],
       [2,3],           [2,3,4],           [3,4,5],           [4,5],
       [3,4]])          [3,4,5]])          [4,5,6]])          [5,6]])

array([[2,3],    array([[2,3,4],    array([[3,4,5],    array([[4,5],
       [3,4],           [3,4,5],           [4,5,6],           [5,6],
       [4,5]])          [4,5,6]])          [5,6,7]])          [6,7]])

array([[3,4],    array([[3,4,5],    array([[4,5,6],    array([[5,6],
       [4,5]])          [4,5,6]])          [5,6,7]])          [6,7]])

因为我需要多次执行此操作,所以速度很重要,理想的解决方案是矢量化操作。

这不是您问题的严格答案,因为它没有矢量化,但希望它是任何其他潜在解决方案的有用基准(图像处理库中肯定有一些东西?)

无论如何,我已经将 window 实现为一个循环,它将 window 的平均值与输出到一个新数组中。输入是一个数组和 window 大小 +/- 当前索引。一个版本直接使用 Python 和 Numpy,另一个使用 numba 编译。

def mw_mean(in_arr,out_arr,x_win,y_win):
    xn,yn = in_arr.shape
    for x in range(xn):
        xmin = max([0,x - x_win])
        xmax = min([xn, x + x_win + 1])
        for y in range(yn):
            ymin = max([0,y - y_win])
            ymax = min([yn, y + y_win + 1])

            out_arr[x,y] = in_arr[xmin:xmax, ymin:ymax].mean()
    return out_arr



@jit("i4[:,:](i4[:,:],i4[:,:],i4,i4)", nopython = True)
def mw_mean_numba(in_arr,out_arr,x_win,y_win):
    xn,yn = in_arr.shape
    for x in range(xn):
        xmin = max(0,x - x_win)
        xmax = min(xn, x + x_win + 1)
        for y in range(yn):
            ymin = max(0,y - y_win)
            ymax = min(yn, y + y_win + 1)

            out_arr[x,y] = in_arr[xmin:xmax, ymin:ymax].mean()
    return out_arr

这是针对三种不同的数组大小进行测试的——您的原始测试用例和两个较大的测试用例(100x100 和 1000x1000):

a = np.array([[1,2,3,4], [2,3,4,5], [3,4,5,6], [4,5,6,7]])
b = np.random.randint(1,7, size = (100,100))
c = np.random.randint(1,7, size = (1000,1000))

aout,bout,cout = np.zeros_like(a),np.zeros_like(b),np.zeros_like(c)

x_win = 1
y_win = 1

没有编译的运行时间:

%timeit mw_mean(a,aout,x_win,y_win)
1000 loops, best of 3: 225 µs per loop

%timeit mw_mean(b,bout,x_win,y_win)
10 loops, best of 3: 137 ms per loop

%timeit mw_mean(c,cout,x_win,y_win)
1 loop, best of 3: 14.1 s per loop

编译运行时间:

%timeit mw_mean_numba(a,aout,x_win,y_win)
1000000 loops, best of 3: 1.22 µs per loop

%timeit mw_mean_numba(b,bout,x_win,y_win)
1000 loops, best of 3: 550 µs per loop

%timeit mw_mean_numba(c,cout,x_win,y_win)
10 loops, best of 3: 55.1 ms per loop

编辑:之前的版本是在原地修改数组,这显然是滚动 window 的大禁忌。基准保持不变。

您可以定义一个生成生成器的函数并使用它。 window 将是你想要的形状的底部除以 2,技巧就是在你沿着行和列移动时沿着 window 索引数组。

def window(arr, shape=(3, 3)):
    # Find row and column window sizes
    r_win = np.floor(shape[0] / 2).astype(int)
    c_win = np.floor(shape[1] / 2).astype(int)
    x, y = arr.shape
     for i in range(x):
         xmin = max(0, i - r_win)
         xmax = min(x, i + r_win + 1)
         for j in range(y):
             ymin = max(0, j - c_win)
             ymax = min(y, j + c_win + 1)
             yield arr[xmin:xmax, ymin:ymax]

你可以像这样使用这个函数:

arr = np.array([[1,2,3,4],
               [2,3,4,5],
               [3,4,5,6],
               [4,5,6,7]])
gen = window(arr)
next(gen)
array([[1, 2],
       [2, 3]])

通过生成器生成示例中的所有 windows。

它没有矢量化,但我不确定是否存在 returns 不同大小数组的矢量化函数。正如@PaulPanzer 指出的那样,您可以将数组填充到您需要的大小,并使用 np.lib.stride_tricks.as_strided 生成切片视图。像这样:

def rolling_window(a, shape):
    s = (a.shape[0] - shape[0] + 1,) + (a.shape[1] - shape[1] + 1,) + shape
    strides = a.strides + a.strides
    return np.lib.stride_tricks.as_strided(a, shape=s, strides=strides)

def window2(arr, shape=(3, 3)):
    r_extra = np.floor(shape[0] / 2).astype(int)
    c_extra = np.floor(shape[1] / 2).astype(int)
    out = np.empty((arr.shape[0] + 2 * r_extra, arr.shape[1] + 2 * c_extra))
    out[:] = np.nan
    out[r_extra:-r_extra, c_extra:-c_extra] = arr
    view = rolling_window(out, shape)
    return view

window2(arr, (3,3))
array([[[[ nan,  nan,  nan],
         [ nan,   1.,   2.],
         [ nan,   2.,   3.]],

        [[ nan,  nan,  nan],
         [  1.,   2.,   3.],
         [  2.,   3.,   4.]],

        [[ nan,  nan,  nan],
         [  2.,   3.,   4.],
         [  3.,   4.,   5.]],

        [[ nan,  nan,  nan],
         [  3.,   4.,  nan],
         [  4.,   5.,  nan]]],


       [[[ nan,   1.,   2.],
         [ nan,   2.,   3.],
         [ nan,   3.,   4.]],

        [[  1.,   2.,   3.],
         [  2.,   3.,   4.],
         [  3.,   4.,   5.]],

        [[  2.,   3.,   4.],
         [  3.,   4.,   5.],
         [  4.,   5.,   6.]],

        [[  3.,   4.,  nan],
         [  4.,   5.,  nan],
         [  5.,   6.,  nan]]],


       [[[ nan,   2.,   3.],
         [ nan,   3.,   4.],
         [ nan,   4.,   5.]],

        [[  2.,   3.,   4.],
         [  3.,   4.,   5.],
         [  4.,   5.,   6.]],

        [[  3.,   4.,   5.],
         [  4.,   5.,   6.],
         [  5.,   6.,   7.]],

        [[  4.,   5.,  nan],
         [  5.,   6.,  nan],
         [  6.,   7.,  nan]]],


       [[[ nan,   3.,   4.],
         [ nan,   4.,   5.],
         [ nan,  nan,  nan]],

        [[  3.,   4.,   5.],
         [  4.,   5.,   6.],
         [ nan,  nan,  nan]],

        [[  4.,   5.,   6.],
         [  5.,   6.,   7.],
         [ nan,  nan,  nan]],

        [[  5.,   6.,  nan],
         [  6.,   7.,  nan],
         [ nan,  nan,  nan]]]])

此版本用 np.nan 填充边缘以避免与数组中的任何其他值混淆。给定的数组比 window 函数快大约 3 倍,但我不确定填充输出将如何影响你想在下游做的任何事情。