使用掩码用较小的二维数组替换二维数组的部分

Replacing sections of 2D array with a smaller 2D array using masks

如何用较小的 2D numpy 数组替换大型 2D numpy 数组中模式的多个实例?

我正在寻找使用布尔掩码的矢量化解决方案,以尽量减少对性能的影响,因为我正在处理的大型数组将有数百万行长。

例如:

#Large array
largeArr = np.array([
    [0, 1, 1],
    [0, 1, 1],
    [0, 1, 1],
    [0, 0, 0],
    [0, 0, 0],
    [0, 0, 0],
    [0, 1, 1],
    [0, 1, 1],
    [0, 1, 1],
    [0, 0, 0],
    [0, 0, 0],
    [3, 2, 0],
    [3, 2, 0],
    [3, 2, 0],
    [3, 2, 0],
    [0, 0, 0],
    [0, 0, 0],
    [3, 2, 0],
    [3, 2, 0],
    [3, 2, 0],
    [3, 2, 0],
    [0, 0, 0]
])

我想用包含 [0, 1, 1] 的连续 3 行替换为

pattern1 = [
    [0, 2, 1],
    [0, 2, 2],
    [0, 2, 3]
]

然后我想用

替换包含 [3, 2, 0] 的连续 4 行的部分
pattern2 = [
    [5, 2, 1],
    [5, 3, 2],
    [5, 4, 3],
    [5, 5, 4]
]

预期结果:

[[0, 2, 1],
 [0, 2, 2],
 [0, 2, 3],
 [0, 0, 0],
 [0, 0, 0],
 [0, 0, 0],
 [0, 2, 1],
 [0, 2, 2],
 [0, 2, 3],
 [0, 0, 0],
 [0, 0, 0],
 [5, 2, 1],
 [5, 3, 2],
 [5, 4, 3],
 [5, 5, 4],
 [0, 0, 0],
 [0, 0, 0],
 [5, 2, 1],
 [5, 3, 2],
 [5, 4, 3],
 [5, 5, 4],
 [0, 0, 0]]

将有多个模式可供查找和替换,每个模式都有自己的替换数组。目的是一次循环提供的搜索行和替换模式。

搜索行始终是单行,重复次数与替换模式中的行数相同。

我假设你所有的数量都是数组,而不是列表。如果不是这种情况,请将它们包装在 np.array:

search = np.array([0, 1, 1])
pattern = np.array(pattern1)

块的大小由

给出
n = len(pattern)  # or pattern.shape[0]

我假设您只想替换不重叠的片段。因此,虽然六行 search 在输出中恰好构成了两个 pattern 实例,但七行构成了两个 pattern 实例和一个 search.[=41 实例=]

搜索模式很简单。首先创建行匹配模式的掩码:

mask = (largeArr == search).all(1)

找口罩连续运行的成语在本站被打死了。要点是使用 np.diff 查找掩码符号更改的位置,然后使用 np.flatnonzero 获取索引,然后再次使用 np.diff 计算 运行 长度。首先填充掩码以确保结果正确包含端点:

indices = np.flatnonzero(np.diff(np.r_[False, mask, False])).reshape(-1, 2)
runs = np.diff(indices, axis=1).squeeze()

请注意,为方便起见,indices 已重塑为两列。传递的 l 填充保证这是可能的。第一列是每个运行(含)的开始,而第二列是结束(不包括)。这使得计算 runs 中的 运行 长度变得微不足道。

现在您可以调整 indices 以仅包含 运行 个大小 n 或更长的元素,并且 trim 结尾元素是 [=30] 的倍数=]远离起始元素:

# runs = n * (runs // n), but for huge arrays
np.floor_divide(runs, n, out=runs)
np.multiply(runs, n, out=runs)

indices[:, 1] = indices[:, 0] + runs

您可以使用 indices = indices[np.flatnonzero(runs)]indices 中 trim 零长度 运行s,但这不是必需的。下一步是将调整后的 indices 转换回掩码:

mask = np.zeros_like(mask, dtype=np.int8)

np.uint8 dtype 允许您在掩码中存储+1 和-1,并且大小与np.bool_ 相同,这意味着如果处理得当,最终结果可以无缝被视为布尔掩码:

starts, ends = indices.T
if ends[-1] == mask.size:
    ends = ends[:-1]
mask[starts] = 1
mask[ends] -= 1  # This takes care of zero-length segments automatically
mask = np.cumsum(mask, out=mask).view(bool)

ends 的额外处理,作为 indices 的第二列解包,处理掩码 运行 到数组末尾的情况。由于结束索引是独占的,这将超过数组的末尾,但这也意味着 运行 根本不需要终止。

现在您的掩码已经过过滤和 trimmed,您可以分配给 largeArr 中的正确行。最简单的方法是根据需要重复 pattern 多次:

largeArr[mask, :] = np.tile(pattern, [runs.sum() // n, 1])

如果你把它打包成一个函数,你可以运行它用于多种模式:

def replace_pattern(arr, search, pattern):
    n = len(pattern)
    mask = (arr == search).all(1)
    indices = np.flatnonzero(np.diff(np.r_[False, mask, False])).reshape(-1, 2)
    runs = np.diff(indices, axis=1).squeeze()
    np.floor_divide(runs, n, out=runs)
    np.multiply(runs, n, out=runs)
    indices[:, 1] = indices[:, 0] + runs
    mask = np.zeros_like(mask, dtype=np.int8)
    starts, ends = indices.T
    if ends[-1] == mask.size:
        ends = ends[:-1]
    mask[starts] = 1
    mask[ends] -= 1
    mask = np.cumsum(mask, out=mask).view(bool)
    arr[mask, :] = np.tile(pattern, [runs.sum() // n, 1])

replacements = [
    ([0, 1, 1], [[0, 2, 1],
                 [0, 2, 2],
                 [0, 2, 3]]),
    ([3, 2, 0], [[5, 2, 1],
                 [5, 3, 2],
                 [5, 4, 3],
                 [5, 5, 4]])
]

largeArr = np.array(...)

for search, pattern in replacements:
    replace_pattern(largeArr, search, pattern)