如何切片 numpy 数组以提取多维数组中的特定索引

How to slice numpy array to extract specific indices in a multidimentional array

在运行时,mask数组中的值被更新,并且根据maskTrue条目的索引,我必须对[=11=中的相应子矩阵求和].以下代码实现了预期:

result = np.zeros((300, 300))
for i in range(300):
    for j in range(300):
        if mask[i, j]:
            result = result + data[i, j]

但是,上面的代码非常慢!

我希望使用 NumPy 的内置切片和索引方法。大致思路如下:

  1. 使用 np.where(mask==True) 到 select mask 的索引 True
  2. data 中提取形状 (M, N, 300, 300) 的子矩阵 reducedData 对应于 步骤 1
  3. 中的索引
  4. 沿轴 0 和轴 1 (np.sum(reducedData, axis=(0, 1))) 对 reducedData 数组求和

我不确定如何实现上述 步骤 2。感谢您的帮助!

问题中的循环可以通过基于数组大小的 Numba 来加速。所以,如果我们可以创建一个示例作为数据:

rng = np.random.default_rng(85)

number = 80
a = np.zeros((number//2, number), dtype=bool)
b = np.ones((number//2, number), dtype=bool)
con = np.concatenate((a, b)).ravel()
rng.shuffle(con)
mask = con.reshape(number, number)
Data = rng.random(number ** 4, dtype=np.float64).reshape(number, number, number, number)
result = np.zeros((number, number))

我们可以通过 Numba 并行化 jitting 来加速代码:

@nb.njit("boolean[:,::1], float64[:,::1], float64[:,:,:,::1]", parallel=True)
def numba_(mask, result, Data):
    for i in nb.prange(number):
        for j in range(number):
            if (mask[i, j]):
                result = result + Data[i, j]
    return result

这是在我的系统上进行的性能比较。对于较小的数组,代码 运行 快 3 到 4 倍,而数组形状 (150,150,150,150) 它 运行 快大约 2 倍。由于内存泄漏,无法测试更大的数组(内存问题是由于 rng.random 在我的测试中创建的数据)。
要解决内存问题,我的 可能会有所帮助。我尝试为这个问题编写一些类似于提到的答案的代码,但经过一段时间后我无法完成这段代码(由于我的工作量,它是 time-consuming 为我调试)并且它需要如果需要,可以更正。我把不完整的代码 只是为了启发如何使用这样的 chunks:

@nb.njit("boolean[:,::1], float64[:,::1], float64[:,:,:,::1]", parallel=True)
def numba_(mask, result, Data):
    chunk_val = 50                        # it is arbitrary and must be chosen based on the system rams size
    chunk = Data.shape[0] // chunk_val
    chunk_res = Data.shape[0] % chunk_val

    for i in range(chunk):
        for j in range(number):
            for k in range(i * chunk_val, (i + 1) * chunk_val):
                if mask[j, k]:
                    min_ind = i * chunk_val
                    max_ind = (i + 1) * chunk_val
                    result[:, min_ind:max_ind] = result[:, min_ind:max_ind] + Data[j, k][:, min_ind:max_ind]
            if i == chunk - 1:
                for k in range((i + 1) * chunk_val, Data.shape[0]):
                    if mask[j, k]:
                        result[:, -chunk_res:] = result[:, -chunk_res:] + Data[j, k][:, -chunk_res:]
    return result

请注意,此代码必须在完成后根据内存消耗进行评估,但在速度方面将作为我使用 numba 的建议答案。

基准测试(核心 i5 G.no1,内存 16):

shape = (70, 70, 70, 70)               (~x3 _ very faster)
31247600    ns    --> OP code
0           ns    --> Numba accelerated

shape = (100, 100, 100, 100)           (~x2.5)
78147400    ns    --> OP code
31229900    ns    --> Numba accelerated

shape = (150, 150, 150, 150)           (~x2)
390626400   ns    --> OP code
218730800   ns    --> Numba accelerated

shape = (170, 170, 170, 170)           (~x2)
625012500   ns    --> OP code
328132000   ns    --> Numba accelerated
  • 已更新 Colab (comparing to )
  • 上的基准
10 loops, best of 5: 46.4 ms per loop    # This answer (numba)   (~x7)
10 loops, best of 5: 344  ms per loop    # Mad Physicist

过度技巧是使用 np.add.reduceat 来避免创建一个巨大的临时数组。例如,在我的机器上,以这种方式对 (300, 300, 100, 100) 数组(这是我能够分配的最大数组)求和不到一秒:

np.add.reduce(data, axis=(0, 1), out=out, where=mask[..., None, None])

唯一需要注意的是,您无法广播掩码:您必须明确设置尺寸。

碰巧,你的循环并没有那么慢:它比我机器上的矢量化操作快约 2 倍。

np.random.seed(435)
data = np.random.uniform(size=(300, 300, 100, 100))
mask = np.random.randint(2, size=(300, 300), dtype=bool)
out = np.zeros((100, 100), float)

result = np.zeros((100, 100))
for i in range(300):
    for j in range(300):
        if mask[i, j]:
            result = result + data[i, j]

(result == out).all() ## True
%timeit np.add.reduce(data, axis=(0, 1), out=out, where=mask[..., None, None])
715 ms ± 1.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
result = np.zeros((100, 100))
for i in range(300):
    for j in range(300):
        if mask[i, j]:
            result = result + data[i, j]
442 ms ± 2.02 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

这两种方法中的任何一种都比您提出的方法具有巨大优势,因为它们不会使用大量内存。你的三个步骤归结为

data[mask].sum(0)

您可以跳过第 1 步,因为 numpy 允许使用布尔掩码进行索引,而掩码只能产生一维输出,因为行可能包含不同数量的元素,因此是单个总和。问题是 data[mask] 简单地杀死了我的解释器,因为临时数组大小太大。