将 N-dim 数组广播到 (N+1)-dim 数组并对除 1 dim 以外的所有数组求和
Broadcasting N-dim array to (N+1)-dim array and summing on all but 1 dim
假设您有一个形状为 (a,b,c) 的 numpy 数组和一个形状为 (a,b,c,d) 的布尔掩码。
我想将掩码应用于在最后一个轴上迭代的数组,沿前三个轴对掩码数组求和,并获得 length/shape (d,) 的列表(或数组)。
我试图通过列表理解来做到这一点:
Result = [np.sum(Array[Mask[:,:,:,i]], axis=(0,1,2)) for i in range(d)]
它可以工作,但它看起来不是很 pythonic,而且速度也有点慢。
我也试过类似
Array = Array[:,:,:,np.newaxis]
Result = np.sum(Array[Mask], axis=(0,1,2))
但这当然行不通,因为 Mask 沿最后一个轴的维度 d 大于数组最后一个轴的维度 1。
此外,考虑到每个轴的维度可能为 100 或 200,因此使用 np.repeat
沿新的最后一个轴重复数组 d 次确实会占用大量内存,我想避免这种情况。
除了列表理解之外,还有其他更快、更 pythonic 的替代方法吗?
怎么样
Array.reshape(-1)@Mask.reshape(-1,d)
既然你是在前三个轴上求和,你不妨合并它们,之后很容易看出该操作可以写成矩阵向量乘积
示例:
a,b,c,d = 4,5,6,7
Mask = np.random.randint(0,2,(a,b,c,d),bool)
Array = np.random.randint(0,10,(a,b,c))
[np.sum(Array[Mask[:,:,:,i]]) for i in range(d)]
# [310, 237, 253, 261, 229, 268, 184]
Array.reshape(-1)@Mask.reshape(-1,d)
# array([310, 237, 253, 261, 229, 268, 184])
将 N 维数组广播到匹配的 (N+1) 维数组的最直接方法是使用 np.broadcast_to()
:
import numpy as np
arr = np.random.randint(0, 100, (2, 3))
mask = np.random.randint(0, 2, (2, 3, 4), dtype=bool)
b_arr = np.broadcast_to(arr[..., None], mask.shape)
print(mask.shape == b_arr.shape)
# True
但是,正如@hpaulj 已经指出的那样,您不能在不丢失尺寸的情况下使用 mask
进行切片 b_arr
。
鉴于您只想将元素相加并对零求和 "does not hurt",您可以简单地将数组和掩码按元素相乘,以保持正确的维度,但 [=掩码中的 19=] 与相应数组元素的后续 sum
无关:
result = np.sum(b_arr * mask, axis=tuple(range(mask.ndim - 1)))
或者,因为 *
会自动进行广播:
result = np.sum(arr[..., None] * mask, axis=tuple(range(mask.ndim - 1)))
无需首先使用 np.broadcast_to()
(但您仍然需要匹配维数,即使用 arr[..., None]
而不仅仅是 arr
)。
作为@PaulPanzer , since you want to sum up over all but one dimensions, this can be further simplified using np.matmul()
/@
:
result2 = arr.ravel() @ mask.reshape(-1, mask.shape[-1])
print(np.all(result == result2))
# True
关于求和的更高级的操作,请看np.einsum()
。
编辑
广播的问题在于它会在表达式求值期间创建临时数组。
对于你似乎要处理的数字,我根本无法使用广播数组,因为我将 运行 转换为 MemoryError
,但时间方面,元素方面的乘法可能仍然更好方法比你最初提出的要多。
或者,如果您追求速度,您可以在较低的级别上使用 Cython 或 Numba 中的显式循环来执行此操作。
您可以在下面找到几个基于 Numba 的解决方案(处理 ravel()
-ed 数据):
_vector_matrix_product()
: 不使用任何临时数组
_vector_matrix_product_mp()
:部分同上但使用并行执行
_vector_matrix_product_sum()
:使用np.sum()
和并行执行
import numpy as np
import numba as nb
@nb.jit(nopython=True)
def _vector_matrix_product(
vect_arr,
mat_arr,
result_arr):
rows, cols = mat_arr.shape
if vect_arr.shape == result_arr.shape:
for i in range(rows):
for j in range(cols):
result_arr[i] += vect_arr[j] * mat_arr[i, j]
else:
for i in range(rows):
for j in range(cols):
result_arr[j] += vect_arr[i] * mat_arr[i, j]
@nb.jit(nopython=True, parallel=True)
def _vector_matrix_product_mp(
vect_arr,
mat_arr,
result_arr):
rows, cols = mat_arr.shape
if vect_arr.shape == result_arr.shape:
for i in nb.prange(rows):
for j in nb.prange(cols):
result_arr[i] += vect_arr[j] * mat_arr[i, j]
else:
for i in nb.prange(rows):
for j in nb.prange(cols):
result_arr[j] += vect_arr[i] * mat_arr[i, j]
@nb.jit(nopython=True, parallel=True)
def _vector_matrix_product_sum(
vect_arr,
mat_arr,
result_arr):
rows, cols = mat_arr.shape
if vect_arr.shape == result_arr.shape:
for i in nb.prange(rows):
result_arr[i] = np.sum(vect_arr * mat_arr[i, :])
else:
for j in nb.prange(cols):
result_arr[j] = np.sum(vect_arr * mat_arr[:, j])
def vector_matrix_product(
vect_arr,
mat_arr,
swap=False,
dtype=None,
mode=None):
rows, cols = mat_arr.shape
if not dtype:
dtype = (vect_arr[0] * mat_arr[0, 0]).dtype
if not swap:
result_arr = np.zeros(cols, dtype=dtype)
else:
result_arr = np.zeros(rows, dtype=dtype)
if mode == 'sum':
_vector_matrix_product_sum(vect_arr, mat_arr, result_arr)
elif mode == 'mp':
_vector_matrix_product_mp(vect_arr, mat_arr, result_arr)
else:
_vector_matrix_product(vect_arr, mat_arr, result_arr)
return result_arr
np.random.seed(0)
arr = np.random.randint(0, 100, (2, 3, 4))
mask = np.random.randint(0, 2, (2, 3, 4, 5), dtype=bool)
target = arr.ravel() @ mask.reshape(-1, mask.shape[-1])
print(target)
# [820 723 861 486 408]
result1 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]))
print(result1)
# [820 723 861 486 408]
result2 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='mp')
print(result2)
# [820 723 861 486 408]
result3 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='sum')
print(result3)
# [820 723 861 486 408]
与任何基于 list
理解的解决方案相比,改进了计时:
arr = np.random.randint(0, 100, (256, 256, 256))
mask = np.random.randint(0, 2, (256, 256, 256, 128), dtype=bool)
%timeit np.sum(arr[..., None] * mask, axis=tuple(range(mask.ndim - 1)))
# MemoryError
%timeit arr.ravel() @ mask.reshape(-1, mask.shape[-1])
# MemoryError
%timeit np.array([np.sum(arr * mask[..., i], axis=tuple(range(mask.ndim - 1))) for i in range(mask.shape[-1])])
# 24.1 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit np.array([np.sum(arr[mask[..., i]]) for i in range(mask.shape[-1])])
# 46 s ± 119 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]))
# 408 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='mp')
# 1.63 s ± 3.58 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='sum')
# 7.17 s ± 258 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
正如预期的那样,JIT 加速版本是最快的,并且对代码强制执行并行性不会提高速度。
另请注意,逐元素乘法的方法比切片更快(这些基准测试的速度大约快两倍)。
编辑 2
按照@max9111 的建议,首先按行循环,然后按列循环导致最耗时的循环 运行 连续数据,从而显着加快速度。
如果没有这个技巧,_vector_matrix_product_sum()
和 _vector_matrix_product_mp()
将以基本相同的速度 运行。
假设您有一个形状为 (a,b,c) 的 numpy 数组和一个形状为 (a,b,c,d) 的布尔掩码。 我想将掩码应用于在最后一个轴上迭代的数组,沿前三个轴对掩码数组求和,并获得 length/shape (d,) 的列表(或数组)。 我试图通过列表理解来做到这一点:
Result = [np.sum(Array[Mask[:,:,:,i]], axis=(0,1,2)) for i in range(d)]
它可以工作,但它看起来不是很 pythonic,而且速度也有点慢。 我也试过类似
Array = Array[:,:,:,np.newaxis]
Result = np.sum(Array[Mask], axis=(0,1,2))
但这当然行不通,因为 Mask 沿最后一个轴的维度 d 大于数组最后一个轴的维度 1。
此外,考虑到每个轴的维度可能为 100 或 200,因此使用 np.repeat
沿新的最后一个轴重复数组 d 次确实会占用大量内存,我想避免这种情况。
除了列表理解之外,还有其他更快、更 pythonic 的替代方法吗?
怎么样
Array.reshape(-1)@Mask.reshape(-1,d)
既然你是在前三个轴上求和,你不妨合并它们,之后很容易看出该操作可以写成矩阵向量乘积
示例:
a,b,c,d = 4,5,6,7
Mask = np.random.randint(0,2,(a,b,c,d),bool)
Array = np.random.randint(0,10,(a,b,c))
[np.sum(Array[Mask[:,:,:,i]]) for i in range(d)]
# [310, 237, 253, 261, 229, 268, 184]
Array.reshape(-1)@Mask.reshape(-1,d)
# array([310, 237, 253, 261, 229, 268, 184])
将 N 维数组广播到匹配的 (N+1) 维数组的最直接方法是使用 np.broadcast_to()
:
import numpy as np
arr = np.random.randint(0, 100, (2, 3))
mask = np.random.randint(0, 2, (2, 3, 4), dtype=bool)
b_arr = np.broadcast_to(arr[..., None], mask.shape)
print(mask.shape == b_arr.shape)
# True
但是,正如@hpaulj 已经指出的那样,您不能在不丢失尺寸的情况下使用 mask
进行切片 b_arr
。
鉴于您只想将元素相加并对零求和 "does not hurt",您可以简单地将数组和掩码按元素相乘,以保持正确的维度,但 [=掩码中的 19=] 与相应数组元素的后续 sum
无关:
result = np.sum(b_arr * mask, axis=tuple(range(mask.ndim - 1)))
或者,因为 *
会自动进行广播:
result = np.sum(arr[..., None] * mask, axis=tuple(range(mask.ndim - 1)))
无需首先使用 np.broadcast_to()
(但您仍然需要匹配维数,即使用 arr[..., None]
而不仅仅是 arr
)。
作为@PaulPanzer np.matmul()
/@
:
result2 = arr.ravel() @ mask.reshape(-1, mask.shape[-1])
print(np.all(result == result2))
# True
关于求和的更高级的操作,请看np.einsum()
。
编辑
广播的问题在于它会在表达式求值期间创建临时数组。
对于你似乎要处理的数字,我根本无法使用广播数组,因为我将 运行 转换为 MemoryError
,但时间方面,元素方面的乘法可能仍然更好方法比你最初提出的要多。
或者,如果您追求速度,您可以在较低的级别上使用 Cython 或 Numba 中的显式循环来执行此操作。
您可以在下面找到几个基于 Numba 的解决方案(处理 ravel()
-ed 数据):
_vector_matrix_product()
: 不使用任何临时数组_vector_matrix_product_mp()
:部分同上但使用并行执行_vector_matrix_product_sum()
:使用np.sum()
和并行执行
import numpy as np
import numba as nb
@nb.jit(nopython=True)
def _vector_matrix_product(
vect_arr,
mat_arr,
result_arr):
rows, cols = mat_arr.shape
if vect_arr.shape == result_arr.shape:
for i in range(rows):
for j in range(cols):
result_arr[i] += vect_arr[j] * mat_arr[i, j]
else:
for i in range(rows):
for j in range(cols):
result_arr[j] += vect_arr[i] * mat_arr[i, j]
@nb.jit(nopython=True, parallel=True)
def _vector_matrix_product_mp(
vect_arr,
mat_arr,
result_arr):
rows, cols = mat_arr.shape
if vect_arr.shape == result_arr.shape:
for i in nb.prange(rows):
for j in nb.prange(cols):
result_arr[i] += vect_arr[j] * mat_arr[i, j]
else:
for i in nb.prange(rows):
for j in nb.prange(cols):
result_arr[j] += vect_arr[i] * mat_arr[i, j]
@nb.jit(nopython=True, parallel=True)
def _vector_matrix_product_sum(
vect_arr,
mat_arr,
result_arr):
rows, cols = mat_arr.shape
if vect_arr.shape == result_arr.shape:
for i in nb.prange(rows):
result_arr[i] = np.sum(vect_arr * mat_arr[i, :])
else:
for j in nb.prange(cols):
result_arr[j] = np.sum(vect_arr * mat_arr[:, j])
def vector_matrix_product(
vect_arr,
mat_arr,
swap=False,
dtype=None,
mode=None):
rows, cols = mat_arr.shape
if not dtype:
dtype = (vect_arr[0] * mat_arr[0, 0]).dtype
if not swap:
result_arr = np.zeros(cols, dtype=dtype)
else:
result_arr = np.zeros(rows, dtype=dtype)
if mode == 'sum':
_vector_matrix_product_sum(vect_arr, mat_arr, result_arr)
elif mode == 'mp':
_vector_matrix_product_mp(vect_arr, mat_arr, result_arr)
else:
_vector_matrix_product(vect_arr, mat_arr, result_arr)
return result_arr
np.random.seed(0)
arr = np.random.randint(0, 100, (2, 3, 4))
mask = np.random.randint(0, 2, (2, 3, 4, 5), dtype=bool)
target = arr.ravel() @ mask.reshape(-1, mask.shape[-1])
print(target)
# [820 723 861 486 408]
result1 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]))
print(result1)
# [820 723 861 486 408]
result2 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='mp')
print(result2)
# [820 723 861 486 408]
result3 = vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='sum')
print(result3)
# [820 723 861 486 408]
与任何基于 list
理解的解决方案相比,改进了计时:
arr = np.random.randint(0, 100, (256, 256, 256))
mask = np.random.randint(0, 2, (256, 256, 256, 128), dtype=bool)
%timeit np.sum(arr[..., None] * mask, axis=tuple(range(mask.ndim - 1)))
# MemoryError
%timeit arr.ravel() @ mask.reshape(-1, mask.shape[-1])
# MemoryError
%timeit np.array([np.sum(arr * mask[..., i], axis=tuple(range(mask.ndim - 1))) for i in range(mask.shape[-1])])
# 24.1 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit np.array([np.sum(arr[mask[..., i]]) for i in range(mask.shape[-1])])
# 46 s ± 119 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]))
# 408 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='mp')
# 1.63 s ± 3.58 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit vector_matrix_product(arr.ravel(), mask.reshape(-1, mask.shape[-1]), mode='sum')
# 7.17 s ± 258 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
正如预期的那样,JIT 加速版本是最快的,并且对代码强制执行并行性不会提高速度。 另请注意,逐元素乘法的方法比切片更快(这些基准测试的速度大约快两倍)。
编辑 2
按照@max9111 的建议,首先按行循环,然后按列循环导致最耗时的循环 运行 连续数据,从而显着加快速度。
如果没有这个技巧,_vector_matrix_product_sum()
和 _vector_matrix_product_mp()
将以基本相同的速度 运行。