将 N-dim 数组广播到 (N+1)-dim 数组并对除 1 dim 以外的所有数组求和

Broadcasting N-dim array to (N+1)-dim array and summing on all but 1 dim

假设您有一个形状为 (a,b,c) 的 numpy 数组和一个形状为 (a,b,c,d) 的布尔掩码。 我想将掩码应用于在最后一个轴上迭代的数组,沿前三个轴对掩码数组求和,并获得 length/shape (d,) 的列表(或数组)。 我试图通过列表理解来做到这一点:

Result = [np.sum(Array[Mask[:,:,:,i]], axis=(0,1,2)) for i in range(d)]

它可以工作,但它看起来不是很 pythonic,而且速度也有点慢。 我也试过类似

Array = Array[:,:,:,np.newaxis]
Result = np.sum(Array[Mask], axis=(0,1,2))

但这当然行不通,因为 Mask 沿最后一个轴的维度 d 大于数组最后一个轴的维度 1。 此外,考虑到每个轴的维度可能为 100 或 200,因此使用 np.repeat 沿新的最后一个轴重复数组 d 次确实会占用大量内存,我想避免这种情况。 除了列表理解之外,还有其他更快、更 pythonic 的替代方法吗?

怎么样

Array.reshape(-1)@Mask.reshape(-1,d)

既然你是在前三个轴上求和,你不妨合并它们,之后很容易看出该操作可以写成矩阵向量乘积

示例:

a,b,c,d = 4,5,6,7
Mask = np.random.randint(0,2,(a,b,c,d),bool)
Array = np.random.randint(0,10,(a,b,c))
[np.sum(Array[Mask[:,:,:,i]]) for i in range(d)]
# [310, 237, 253, 261, 229, 268, 184]    
Array.reshape(-1)@Mask.reshape(-1,d)
# array([310, 237, 253, 261, 229, 268, 184])

将 N 维数组广播到匹配的 (N+1) 维数组的最直接方法是使用 np.broadcast_to():

import numpy as np


arr = np.random.randint(0, 100, (2, 3))
mask = np.random.randint(0, 2, (2, 3, 4), dtype=bool)
b_arr = np.broadcast_to(arr[..., None], mask.shape)
print(mask.shape == b_arr.shape)
# True

但是,正如@hpaulj 已经指出的那样,您不能在不丢失尺寸的情况下使用 mask 进行切片 b_arr


鉴于您只想将元素相加并对零求和 "does not hurt",您可以简单地将数组和掩码按元素相乘,以保持正确的维度,但 [=掩码中的 19=] 与相应数组元素的后续 sum 无关:

result = np.sum(b_arr * mask, axis=tuple(range(mask.ndim - 1)))

或者,因为 * 会自动进行广播:

result = np.sum(arr[..., None] * mask, axis=tuple(range(mask.ndim - 1)))

无需首先使用 np.broadcast_to()(但您仍然需要匹配维数,即使用 arr[..., None] 而不仅仅是 arr)。


作为@PaulPanzer , since you want to sum up over all but one dimensions, this can be further simplified using np.matmul()/@:

result2 = arr.ravel() @ mask.reshape(-1, mask.shape[-1])
print(np.all(result == result2))
# True

关于求和的更高级的操作,请看np.einsum()


编辑

广播的问题在于它会在表达式求值期间创建临时数组。

对于你似乎要处理的数字,我根本无法使用广播数组,因为我将 运行 转换为 MemoryError,但时间方面,元素方面的乘法可能仍然更好方法比你最初提出的要多。

或者,如果您追求速度,您可以在较低的级别上使用 Cython 或 Numba 中的显式循环来执行此操作。

您可以在下面找到几个基于 Numba 的解决方案(处理 ravel()-ed 数据):

  • _vector_matrix_product(): 不使用任何临时数组
  • _vector_matrix_product_mp():部分同上但使用并行执行
  • _vector_matrix_product_sum():使用np.sum()和并行执行
import numpy as np
import numba as nb


@nb.jit(nopython=True)
def _vector_matrix_product(
        vect_arr,
        mat_arr,
        result_arr):
    rows, cols = mat_arr.shape
    if vect_arr.shape == result_arr.shape:
        for i in range(rows):
            for j in range(cols):
                result_arr[i] += vect_arr[j] * mat_arr[i, j]
    else:
        for i in range(rows):
            for j in range(cols):            
                result_arr[j] += vect_arr[i] * mat_arr[i, j]


@nb.jit(nopython=True, parallel=True)
def _vector_matrix_product_mp(
        vect_arr,
        mat_arr,
        result_arr):
    rows, cols = mat_arr.shape
    if vect_arr.shape == result_arr.shape:
        for i in nb.prange(rows):
            for j in nb.prange(cols):
                result_arr[i] += vect_arr[j] * mat_arr[i, j]
    else:
        for i in nb.prange(rows):
            for j in nb.prange(cols):        
                result_arr[j] += vect_arr[i] * mat_arr[i, j]


@nb.jit(nopython=True, parallel=True)
def _vector_matrix_product_sum(
        vect_arr,
        mat_arr,
        result_arr):
    rows, cols = mat_arr.shape
    if vect_arr.shape == result_arr.shape:
        for i in nb.prange(rows):
            result_arr[i] = np.sum(vect_arr * mat_arr[i, :])
    else:
        for j in nb.prange(cols):
            result_arr[j] = np.sum(vect_arr * mat_arr[:, j])


def vector_matrix_product(
        vect_arr,
        mat_arr,
        swap=False,
        dtype=None,
        mode=None):
    rows, cols = mat_arr.shape
    if not dtype:
        dtype = (vect_arr[0] * mat_arr[0, 0]).dtype
    if not swap:
        result_arr = np.zeros(cols, dtype=dtype)
    else:
        result_arr = np.zeros(rows, dtype=dtype)
    if mode == 'sum':
        _vector_matrix_product_sum(vect_arr, mat_arr, result_arr)
    elif mode == 'mp':
        _vector_matrix_product_mp(vect_arr, mat_arr, result_arr)
    else:
        _vector_matrix_product(vect_arr, mat_arr, result_arr)
    return result_arr


np.random.seed(0)
arr = np.random.randint(0, 100, (2, 3, 4))
mask = np.random.randint(0, 2, (2, 3, 4, 5), dtype=bool)
target = arr.ravel() @ mask.reshape(-1, mask.shape[-1])
print(target)
# [820 723 861 486 408]
result1 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]))
print(result1)
# [820 723 861 486 408]
result2 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='mp')
print(result2)
# [820 723 861 486 408]
result3 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='sum')
print(result3)
# [820 723 861 486 408]

与任何基于 list 理解的解决方案相比,改进了计时:

arr = np.random.randint(0, 100, (256, 256, 256))
mask = np.random.randint(0, 2, (256, 256, 256, 128), dtype=bool)


%timeit np.sum(arr[..., None] * mask, axis=tuple(range(mask.ndim - 1)))
# MemoryError

%timeit arr.ravel() @ mask.reshape(-1, mask.shape[-1])
# MemoryError

%timeit np.array([np.sum(arr * mask[..., i], axis=tuple(range(mask.ndim - 1))) for i in range(mask.shape[-1])])
# 24.1 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit np.array([np.sum(arr[mask[..., i]]) for i in range(mask.shape[-1])])
# 46 s ± 119 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]))
# 408 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='mp')
# 1.63 s ± 3.58 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='sum')
# 7.17 s ± 258 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

正如预期的那样,JIT 加速版本是最快的,并且对代码强制执行并行性不会提高速度。 另请注意,逐元素乘法的方法比切片更快(这些基准测试的速度大约快两倍)。


编辑 2

按照@max9111 的建议,首先按行循环,然后按列循环导致最耗时的循环 运行 连续数据,从而显着加快速度。 如果没有这个技巧,_vector_matrix_product_sum()_vector_matrix_product_mp() 将以基本相同的速度 运行。