并行遍历 numpy 数组的区域
Iterating over regions of numpy array in parallel
我有一个 3D 数组,需要对其进行迭代,提取一个 2x2x2 体素大区域并检查是否有任何体素不为零。在这些位置中,我需要区域的唯一元素和索引:
import time
import numpy as np
np.random.seed(1234)
def _naive_iterator(array):
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
nx, ny, nz = lookup.shape
for i in range(nx - 1):
for j in range(ny - 1):
for k in range(nz - 1):
n = lookup[i:i + 2, j:j + 2, k:k + 2]
if n.any(): # check if any value in the region is non-zero
yield n.ravel(), i, j, k
# yield set(n.ravel()), i, j, k # `set()` alone takes some time - for testing purposes exclude this
# arrays that shall be used here are in the region of (1000, 1000, 1000) or larger
# arrays are asserted to contain only integer values >= 0.
array = np.random.randint(0, 2, (200, 200, 200), dtype=np.uint8)
for fun in (_naive_iterator, ):
print(f"* {fun}")
for _ in range(2):
tic = time.time()
[x for x in fun(array)]
print(f" ** execution took {time.time() - tic}")
在我的电脑上,这个循环大约需要 24 秒到 运行。 (有趣的旁注:没有 n.any(),循环只需要 8s,所以也许还有一些优化潜力?)
我考虑过如何使它更快,可能是通过 运行 并行处理它。但是,如果不预先生成所有 2x2x2 数组,我不知道该怎么做。
我也考虑过使用 scipy.ndimage.generic_filter
但这样我只能得到一个图像,例如 1 我想包括的所有像素,但我必须迭代原始图像才能得到 n.ravel()
(理想情况下,人们会直接使用 generic_filter
,但我无法在被调用函数中获取索引)。
我怎样才能加速这个循环,可能是通过并行化迭代?
无论何时使用 numpy,都应尽量避免显式循环。这些循环是用 python 编写的,因此通常比您可以使用矢量化执行的任何操作都慢。这样你就可以将循环推迟到底层 C 函数,这些函数的速度几乎可以达到任何速度。所以我会用类似下面的方法来解决你的问题。此函数与您的 _naive_iterator
做的事情大致相同,但以矢量化方式进行,没有任何 python 循环:
from numpy.lib.stride_tricks import sliding_window_view
def get_windows_and_indices(array):
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
nx, ny, nz = lookup.shape
x, y, z = np.mgrid[0:nx, 0:ny, 0:nz]
lookup = np.stack([lookup, x, y, z])
out = sliding_window_view(lookup, (2, 2, 2), axis=(1, 2, 3)).reshape(4, -1, 2, 2, 2)
windows = out[0, ...]
ic = out[1, ..., 0, 0, 0]
jc = out[2, ..., 0, 0, 0]
kc = out[3, ..., 0, 0, 0]
mask = windows.any(axis=(1, 2, 3))
return windows[mask], ic[mask], jc[mask], kc[mask]
当然,您还需要稍微不同地考虑其余代码,但如果您想(有效地)使用 numpy,那么矢量化确实是您需要习惯的东西。
另外我很确定即使上面的这个功能也不是最佳的,并且肯定可以进一步改进。
without the n.any(), the loop needs only 8s, so maybe there is some optimization potential as well?
这是因为 Numpy 函数对于像 2x2x2 这样的非常小的数组 有很大的开销。 Numpy 函数的开销大约是几微秒,而实际 n.any()
计算在主流处理器上应该不会超过十几纳秒。通常的解决方案是 向量化 操作以避免许多 Numpy 调用。您可以使用 Numba 来加速此代码并消除大部分 CPython/Numpy 开销。请注意,Numba 目前不支持 pad
等所有功能,因此需要一种解决方法。这是结果代码:
import time
import numpy as np
import numba as nb
np.random.seed(1234)
@nb.njit('(uint8[:,:,::1],)')
def numba_iterator(lookup):
nx, ny, nz = lookup.shape
for i in range(nx - 1):
for j in range(ny - 1):
for k in range(nz - 1):
n = lookup[i:i + 2, j:j + 2, k:k + 2]
if n.any():
yield n.ravel(), i, j, k
array = np.random.randint(0, 2, (200, 200, 200), dtype=np.uint8)
for fun in (numba_iterator, ):
print(f"* {fun}")
for _ in range(2):
tic = time.time()
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
[x for x in fun(lookup)]
print(f" ** execution took {time.time() - tic}")
这在我的机器上快了很多倍(但仍然很慢)。
I thought about how I could make this faster, potentially by running it in parallel.
只要使用 yield
这是不可能的,因为 生成器本质上是顺序的。
How can I speed up this loop
一个解决方案可能是在 Numba 中将整个输出生成为 Numpy 数组,以避免 创建 800 万个 Numpy 对象 存储在作为主要来源的 CPython 列表中使用 Numba 优化后的代码减速(每次调用 n.ravel
都会创建一个新数组)。请注意, 生成器通常很慢 ,因为它们通常需要 context-switch(一种 lightweight-thread / 协程)。就性能而言,最好的解决方案是在循环中计算数据 on-the-fly。
此外,n.any
和 n.ravel
可以在 Numba 中手动重写,以提高效率。事实上,n
数组视图非常小,使用 3 个嵌套循环和常量 compile-time 绑定有助于编译器生成快速代码(即,它可以展开循环并仅生成很少的指令处理器可以非常有效地执行)。
这是修改后的改进代码(手动计算填充数组):
@nb.njit('(uint8[:,:,::1],)')
def fast_compute(array):
nx, ny, nz = array.shape
# Padding (with zeros)
lookup = np.zeros((nx+2, ny+2, nz+2), dtype=np.uint8)
for i in range(nx):
for j in range(ny):
for k in range(nz):
lookup[i+1, j+1, k+1] = array[i, j, k]
# Actual computation
size = (nx + 1) * (ny + 1) * (nz + 1)
result = np.empty((size, 8), dtype=np.uint8)
indices = np.empty((size, 3), dtype=np.uint32)
cur = 0
for i in range(nx + 1):
for j in range(ny + 1):
for k in range(nz + 1):
n = lookup[i:i+2, j:j+2, k:k+2]
# Fast manual n.any()
found = False
for i2 in range(2):
for j2 in range(2):
for k2 in range(2):
found |= n[i2, j2, k2]
if found:
# Fast manual n.ravel()
cur2 = 0
for i2 in range(2):
for j2 in range(2):
for k2 in range(2):
result[cur, cur2] = n[i2, j2, k2]
cur2 += 1
indices[cur, 0] = i
indices[cur, 1] = j
indices[cur, 2] = k
cur += 1
return result[:cur].reshape(cur,2,2,2), indices[:cur]
生成的代码相当大,但这是高性能计算的代价。
正如@norok2 所指出的,result[:cur]
和indices[:cur]
是引用数组的视图。与分配的数组相比,视图可能非常小。如果这是一个问题,您可以 return 一个副本(例如 result[:cur].copy()
)以避免可能的内存过度消耗。实际上,这应该不是问题,因为数组是在 virtual memory 中分配的,并且只有写入的页面映射到主流系统的物理内存中(例如 Windows & Linux)。虚拟内存页面仅在 第一次接触 期间映射到物理内存(即,当项目首次写入时)。现代平台可以分配大量虚拟内存(例如,在我的主流 x86-64 Windows 上为 131072 GiB,在主流 x86-64 Linux 上甚至更多),而物理内存则更加稀缺(例如. 16 GiB 在我的机器上)。当不再有视图引用它时,底层数组将被释放。
基准
_naive_iterator: 21.25 s
numba_iterator: 8.10 s
get_windows_and_indices: 1.35 s
fast_compute: 0.13 s
最后一个 Numba 函数比第一个函数快 163 倍,比 @flawr 的向量化 Numpy 实现快 10 倍。
Numba 实现当然可以是 multi-threaded,但这并不容易,因为线程需要写入输出并且写入项的位置(即 cur
)取决于其他线程。此外,这会使代码复杂得多。
在保留功能的同时加速代码的最简单方法是使用 Numba。我假设填充本质上是一个装饰步骤,我会在答案的最后单独处理它。
这是最初提出的代码和简单的 Numba 加速的更清晰的实现:
import numpy as np
import numba as nb
def i_cubicles_3D_set_OP(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
for i in range(nx):
for j in range(ny):
for k in range(nz):
window = arr[i:i + size, j:j + size, k:k + size]
if window.any():
yield set(window.ravel()), (i, j, k)
i_cubicles_3D_set_OP_nb = nb.njit(i_cubicles_3D_set_OP)
i_cubicles_3D_set_OP_nb.__name__ = "i_cubicles_3D_set_OP_nb"
如果有人对它的 dimension-agnostic 版本感兴趣(以牺牲一些速度为代价),可以写:
def i_cubicles_set_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
return _i_cubicles_set_nb(view.reshape((-1, window_size)), reduced_shape)
@nb.njit
def unravel_index(x, shape):
result = np.zeros(len(shape), dtype=np.int_)
for i, dim in enumerate(shape[::-1], 1):
result[-i] = x % dim
x //= dim
return result
@nb.njit
def not_only_zeros(seq):
# assumes seq is not empty
count = 0
for x in seq:
if x == 0:
count += 1
break # because only unique values
return len(seq) != count
@nb.njit
def _i_cubicles_set_nb(arr, shape):
for i, x in enumerate(arr):
uniques = set(x)
if not_only_zeros(uniques):
yield uniques, unravel_index(i, shape)
这引入了生成输入的跨步 (read-only) 视图的重要技巧,它可用于在概念上简化所有循环,但代价是必须手动解开索引。
这与 中提出的想法相似。
在 50³ 大小的索引上,我得到以下时间:
np.random.seed(42)
n = 50
arr = np.random.randint(0, 3, (n, n, n), dtype=np.uint8)
def is_equal_i_set(a, b):
return all(x[0] == y[0] and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = i_cubicles_3d_set_OP_nb, i_cubicles_3d_set_OP, i_cubicles_set_nb
base = list(funcs[0](arr))
for func in funcs:
res = list(func(arr))
print(f"{func.__name__:>24} {is_equal_i_set(base, res)!s:>5}", end=' ')
# %timeit -n 1 -r 1 list(func(arr))
%timeit list(func(arr))
# i_cubicles_3d_set_OP_nb True 1 loop, best of 5: 130 ms per loop
# i_cubicles_3d_set_OP True 1 loop, best of 5: 776 ms per loop
# i_cubicles_set_nb True 10 loops, best of 5: 151 ms per loop
表明使用 Numba 非常有效。
没有唯一性
如果愿意放弃 return 仅在隔间内设置唯一元素的要求,将它们替换为隔间内的所有元素,那么确实会获得一些(但不是很多)速度:
@nb.njit
def i_cubicles_3d_nb(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
for i in range(nx):
for j in range(ny):
for k in range(nz):
window = arr[i:i + size, j:j + size, k:k + size]
if window.any():
yield window.ravel(), (i, j, k)
def i_cubicles_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
return _i_cubicles_nb(view.reshape((-1, window_size)), reduced_shape)
@nb.njit
def unravel_index(x, shape):
result = np.zeros(len(shape), dtype=np.int_)
for i, dim in enumerate(shape[::-1], 1):
result[-i] = x % dim
x //= dim
return result
@nb.njit
def any_nb(arr):
for x in arr:
if x:
return True
return False
@nb.njit
def _i_cubicles_nb(arr, shape):
for i, x in enumerate(arr):
if any_nb(x):
yield x, unravel_index(i, shape)
如以下基准所证明(在与之前相同的 50³ 大小的输入上):
def is_equal_i(a, b):
return all(np.allclose(x[0], y[0]) and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = i_cubicles_3d_nb, i_cubicles_nb
base = list(funcs[0](arr))
for func in funcs:
res = list(func(arr))
print(f"{func.__name__:>24} {is_equal_i(base, res)!s:>5}", end=' ')
# %timeit -n 1 -r 1 list(func(arr))
%timeit list(func(arr))
# print()
# i_cubicles_3d_nb True 10 loops, best of 5: 116 ms per loop
# i_cubicles_nb True 10 loops, best of 5: 125 ms per loop
没有收益(也没有唯一性)
虽然很明显只有使用 Numba / Cython 才能使与 OP 输出完全匹配的函数变得更快,但是通过前面的 OP 代码的一些特性可以获得许多快速方法。
特别是,在创建生成器时,会花费大量时间来创建要产生的实际对象。
可以一次 returned(最重要的是分配)相同的信息,并且速度显着提高,特别是如果我们跳过创建用于计算唯一元素的容器。
一旦我们接受 return 隔间内的所有元素而不是它的独特元素,就可以设计 NumPy-only 矢量化(快速和 dimension-agnostic)方法,同时更快Numba(特定于 3d)实现:
def cubicles_np(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
mask = np.any(view, axis=tuple(range(-arr.ndim, 0)))
return view[mask, ...], np.array(np.nonzero(mask)).transpose()
def cubicles_tr_np(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=window + reduced_shape, strides=arr.strides * 2, writeable=False)
mask = np.any(view, axis=tuple(range(arr.ndim)))
return (
view[..., mask].reshape((window_size, -1)).transpose().reshape((-1, *window)),
np.array(np.nonzero(mask)).transpose())
def cubicles_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
values, indexes = _cubicles_nb(view.reshape((-1, window_size)), reduced_shape, arr.ndim)
return values.reshape((-1, *window)), indexes
@nb.njit
def any_nb(arr):
for x in arr:
if x:
return True
return False
@nb.njit
def _cubicles_nb(arr, shape, ndim):
n, k = arr.shape
indexes = np.empty((n, ndim), dtype=np.bool_)
result = np.empty((n, k), dtype=arr.dtype)
count = 0
for i in range(n):
x = arr[i]
if any_nb(x):
indexes[count] = unravel_index(i, shape)
result[count] = x
count += 1
return result[:count].copy(), indexes[:count].copy()
@nb.njit
def any_cubicle_3d_nb(arr, size):
for i in range(size):
for j in range(size):
for k in range(size):
if arr[i, j, k]:
return True
return False
@nb.njit
def cubicles_3d_nb(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
nn = nx * ny * nz
indexes = np.empty((nn, 3), dtype=np.bool_)
result = np.empty((nn, size, size, size), dtype=arr.dtype)
count = 0
for i in range(nx):
for j in range(ny):
for k in range(nz):
x = arr[i:i + size, j:j + size, k:k + size]
if any_cubicle_3d_nb(x, size):
result[count] = x
indexes[count] = i, j, k
count += 1
return result[:count].copy(), indexes[:count].copy()
在 50³ 大小的输入上再次获得的计时确实表明 Numba-based 方法表明拼出循环比在视图中循环要快得多。
事实上,如果没有显式地沿维度循环,NumPy-only 方法可以比 Numba 加速方法更快。
请注意,cubicles_3d_nb()
本质上可以看作是 的 cleaned-up 版本。
(实际上,JérômeRichard 的 fast_compute()
在我的机器上的时间和输入——加上额外的 .copy()
——似乎表明 cubicles_3d_nb()
更有效——可能是因为short-circuiting 在“任何”代码中,无需手动整理值)。
def is_equal(a, b):
return all(np.allclose(x[0], y[0]) and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = cubicles_3d_nb, cubicles_nb, cubicles_np, cubicles_tr_np
base = funcs[0](arr)
for func in funcs:
res = func(arr)
print(f"{func.__name__:>24} {is_equal(base, res)!s:>5}", end=' ')
%timeit func(arr)
# cubicles_3d_nb True 100 loops, best of 5: 3.82 ms per loop
# cubicles_nb True 10 loops, best of 5: 23 ms per loop
# cubicles_np True 10 loops, best of 5: 24.7 ms per loop
# cubicles_tr_np True 100 loops, best of 5: 16.5 ms per loop
索引说明
如果要一次给出所有结果,那么索引本身在存储有关 non-zero 隔间位置的信息方面并不是特别有效,除非它们很少。
相反,布尔数组的内存效率更高。
索引需要 index_size * ndim * num
(num
是 non-zero 个隔间的数量,必然是 0 < num < prod(shape)
)。
屏蔽需要 bool_size * prod(shape)
。
对于 NumPy bool_size = 8
而 index_size = 64
(可以调整但通常至少为 16),所以:index_size = bool_size * k
。
所以屏蔽更有效,只要:
num < prod(shape) // (k * ndim)
对于 3D 和典型的 index_size = 64
,这意味着 (num / prod(shape)) < (1 / 24)
,因此如果 non-zero 个隔间是 ~5% 或更少,索引是有效的。
Speed-wise,只要 non-zero 隔间,使用布尔掩码而不是索引可能会导致实现速度小而合理(~5 到 ~20%)也不少。
附录:填充
虽然 Numba 不支持 np.pad()
,但在 Numba 之外调用任何填充函数都非常简单。
此外,对于某些输入组合,np.pad()
比在切片输出上简单赋值要慢:
import numpy as np
import numba as nb
@nb.njit
def pad_3d_nb(arr, size=1):
nx, ny, nz = arr.shape
result = np.zeros((nx + 2 * size, ny + 2 * size, nz + 2 * size), dtype=arr.dtype)
for i in range(nx):
for j in range(ny):
for k in range(nz):
result[i + size, j + size, k + size] = arr[i, j, k]
return result
def const_pad(arr, size=1, value=0):
shape = tuple(dim + 2 * size for dim in arr.shape)
mask = tuple(slice(size, dim + size) for dim in arr.shape)
result = np.full(shape, value, dtype=arr.dtype)
result[mask] = arr
return result
np.random.seed(42)
n = 200
k = 10
arr = np.random.randint(0, 3, (n, n, n), dtype=np.uint8)
base = np.pad(arr, (k, k))
print(np.allclose(pad_3d_nb(arr, k), base))
# True
print(np.allclose(const_pad(arr, k), base))
# True
%timeit np.pad(arr, (k, k))
# 100 loops, best of 5: 3.01 ms per loop
%timeit pad_3d_nb(arr, k)
# 100 loops, best of 5: 11.5 ms per loop
%timeit const_pad(arr, k)
# 100 loops, best of 5: 2.53 ms per loop
我有一个 3D 数组,需要对其进行迭代,提取一个 2x2x2 体素大区域并检查是否有任何体素不为零。在这些位置中,我需要区域的唯一元素和索引:
import time
import numpy as np
np.random.seed(1234)
def _naive_iterator(array):
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
nx, ny, nz = lookup.shape
for i in range(nx - 1):
for j in range(ny - 1):
for k in range(nz - 1):
n = lookup[i:i + 2, j:j + 2, k:k + 2]
if n.any(): # check if any value in the region is non-zero
yield n.ravel(), i, j, k
# yield set(n.ravel()), i, j, k # `set()` alone takes some time - for testing purposes exclude this
# arrays that shall be used here are in the region of (1000, 1000, 1000) or larger
# arrays are asserted to contain only integer values >= 0.
array = np.random.randint(0, 2, (200, 200, 200), dtype=np.uint8)
for fun in (_naive_iterator, ):
print(f"* {fun}")
for _ in range(2):
tic = time.time()
[x for x in fun(array)]
print(f" ** execution took {time.time() - tic}")
在我的电脑上,这个循环大约需要 24 秒到 运行。 (有趣的旁注:没有 n.any(),循环只需要 8s,所以也许还有一些优化潜力?)
我考虑过如何使它更快,可能是通过 运行 并行处理它。但是,如果不预先生成所有 2x2x2 数组,我不知道该怎么做。
我也考虑过使用 scipy.ndimage.generic_filter
但这样我只能得到一个图像,例如 1 我想包括的所有像素,但我必须迭代原始图像才能得到 n.ravel()
(理想情况下,人们会直接使用 generic_filter
,但我无法在被调用函数中获取索引)。
我怎样才能加速这个循环,可能是通过并行化迭代?
无论何时使用 numpy,都应尽量避免显式循环。这些循环是用 python 编写的,因此通常比您可以使用矢量化执行的任何操作都慢。这样你就可以将循环推迟到底层 C 函数,这些函数的速度几乎可以达到任何速度。所以我会用类似下面的方法来解决你的问题。此函数与您的 _naive_iterator
做的事情大致相同,但以矢量化方式进行,没有任何 python 循环:
from numpy.lib.stride_tricks import sliding_window_view
def get_windows_and_indices(array):
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
nx, ny, nz = lookup.shape
x, y, z = np.mgrid[0:nx, 0:ny, 0:nz]
lookup = np.stack([lookup, x, y, z])
out = sliding_window_view(lookup, (2, 2, 2), axis=(1, 2, 3)).reshape(4, -1, 2, 2, 2)
windows = out[0, ...]
ic = out[1, ..., 0, 0, 0]
jc = out[2, ..., 0, 0, 0]
kc = out[3, ..., 0, 0, 0]
mask = windows.any(axis=(1, 2, 3))
return windows[mask], ic[mask], jc[mask], kc[mask]
当然,您还需要稍微不同地考虑其余代码,但如果您想(有效地)使用 numpy,那么矢量化确实是您需要习惯的东西。 另外我很确定即使上面的这个功能也不是最佳的,并且肯定可以进一步改进。
without the n.any(), the loop needs only 8s, so maybe there is some optimization potential as well?
这是因为 Numpy 函数对于像 2x2x2 这样的非常小的数组 有很大的开销。 Numpy 函数的开销大约是几微秒,而实际 n.any()
计算在主流处理器上应该不会超过十几纳秒。通常的解决方案是 向量化 操作以避免许多 Numpy 调用。您可以使用 Numba 来加速此代码并消除大部分 CPython/Numpy 开销。请注意,Numba 目前不支持 pad
等所有功能,因此需要一种解决方法。这是结果代码:
import time
import numpy as np
import numba as nb
np.random.seed(1234)
@nb.njit('(uint8[:,:,::1],)')
def numba_iterator(lookup):
nx, ny, nz = lookup.shape
for i in range(nx - 1):
for j in range(ny - 1):
for k in range(nz - 1):
n = lookup[i:i + 2, j:j + 2, k:k + 2]
if n.any():
yield n.ravel(), i, j, k
array = np.random.randint(0, 2, (200, 200, 200), dtype=np.uint8)
for fun in (numba_iterator, ):
print(f"* {fun}")
for _ in range(2):
tic = time.time()
lookup = np.pad(array, (1, 1), 'constant') # Border is filled with 0
[x for x in fun(lookup)]
print(f" ** execution took {time.time() - tic}")
这在我的机器上快了很多倍(但仍然很慢)。
I thought about how I could make this faster, potentially by running it in parallel.
只要使用 yield
这是不可能的,因为 生成器本质上是顺序的。
How can I speed up this loop
一个解决方案可能是在 Numba 中将整个输出生成为 Numpy 数组,以避免 创建 800 万个 Numpy 对象 存储在作为主要来源的 CPython 列表中使用 Numba 优化后的代码减速(每次调用 n.ravel
都会创建一个新数组)。请注意, 生成器通常很慢 ,因为它们通常需要 context-switch(一种 lightweight-thread / 协程)。就性能而言,最好的解决方案是在循环中计算数据 on-the-fly。
此外,n.any
和 n.ravel
可以在 Numba 中手动重写,以提高效率。事实上,n
数组视图非常小,使用 3 个嵌套循环和常量 compile-time 绑定有助于编译器生成快速代码(即,它可以展开循环并仅生成很少的指令处理器可以非常有效地执行)。
这是修改后的改进代码(手动计算填充数组):
@nb.njit('(uint8[:,:,::1],)')
def fast_compute(array):
nx, ny, nz = array.shape
# Padding (with zeros)
lookup = np.zeros((nx+2, ny+2, nz+2), dtype=np.uint8)
for i in range(nx):
for j in range(ny):
for k in range(nz):
lookup[i+1, j+1, k+1] = array[i, j, k]
# Actual computation
size = (nx + 1) * (ny + 1) * (nz + 1)
result = np.empty((size, 8), dtype=np.uint8)
indices = np.empty((size, 3), dtype=np.uint32)
cur = 0
for i in range(nx + 1):
for j in range(ny + 1):
for k in range(nz + 1):
n = lookup[i:i+2, j:j+2, k:k+2]
# Fast manual n.any()
found = False
for i2 in range(2):
for j2 in range(2):
for k2 in range(2):
found |= n[i2, j2, k2]
if found:
# Fast manual n.ravel()
cur2 = 0
for i2 in range(2):
for j2 in range(2):
for k2 in range(2):
result[cur, cur2] = n[i2, j2, k2]
cur2 += 1
indices[cur, 0] = i
indices[cur, 1] = j
indices[cur, 2] = k
cur += 1
return result[:cur].reshape(cur,2,2,2), indices[:cur]
生成的代码相当大,但这是高性能计算的代价。
正如@norok2 所指出的,result[:cur]
和indices[:cur]
是引用数组的视图。与分配的数组相比,视图可能非常小。如果这是一个问题,您可以 return 一个副本(例如 result[:cur].copy()
)以避免可能的内存过度消耗。实际上,这应该不是问题,因为数组是在 virtual memory 中分配的,并且只有写入的页面映射到主流系统的物理内存中(例如 Windows & Linux)。虚拟内存页面仅在 第一次接触 期间映射到物理内存(即,当项目首次写入时)。现代平台可以分配大量虚拟内存(例如,在我的主流 x86-64 Windows 上为 131072 GiB,在主流 x86-64 Linux 上甚至更多),而物理内存则更加稀缺(例如. 16 GiB 在我的机器上)。当不再有视图引用它时,底层数组将被释放。
基准
_naive_iterator: 21.25 s
numba_iterator: 8.10 s
get_windows_and_indices: 1.35 s
fast_compute: 0.13 s
最后一个 Numba 函数比第一个函数快 163 倍,比 @flawr 的向量化 Numpy 实现快 10 倍。
Numba 实现当然可以是 multi-threaded,但这并不容易,因为线程需要写入输出并且写入项的位置(即 cur
)取决于其他线程。此外,这会使代码复杂得多。
在保留功能的同时加速代码的最简单方法是使用 Numba。我假设填充本质上是一个装饰步骤,我会在答案的最后单独处理它。
这是最初提出的代码和简单的 Numba 加速的更清晰的实现:
import numpy as np
import numba as nb
def i_cubicles_3D_set_OP(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
for i in range(nx):
for j in range(ny):
for k in range(nz):
window = arr[i:i + size, j:j + size, k:k + size]
if window.any():
yield set(window.ravel()), (i, j, k)
i_cubicles_3D_set_OP_nb = nb.njit(i_cubicles_3D_set_OP)
i_cubicles_3D_set_OP_nb.__name__ = "i_cubicles_3D_set_OP_nb"
如果有人对它的 dimension-agnostic 版本感兴趣(以牺牲一些速度为代价),可以写:
def i_cubicles_set_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
return _i_cubicles_set_nb(view.reshape((-1, window_size)), reduced_shape)
@nb.njit
def unravel_index(x, shape):
result = np.zeros(len(shape), dtype=np.int_)
for i, dim in enumerate(shape[::-1], 1):
result[-i] = x % dim
x //= dim
return result
@nb.njit
def not_only_zeros(seq):
# assumes seq is not empty
count = 0
for x in seq:
if x == 0:
count += 1
break # because only unique values
return len(seq) != count
@nb.njit
def _i_cubicles_set_nb(arr, shape):
for i, x in enumerate(arr):
uniques = set(x)
if not_only_zeros(uniques):
yield uniques, unravel_index(i, shape)
这引入了生成输入的跨步 (read-only) 视图的重要技巧,它可用于在概念上简化所有循环,但代价是必须手动解开索引。
这与
在 50³ 大小的索引上,我得到以下时间:
np.random.seed(42)
n = 50
arr = np.random.randint(0, 3, (n, n, n), dtype=np.uint8)
def is_equal_i_set(a, b):
return all(x[0] == y[0] and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = i_cubicles_3d_set_OP_nb, i_cubicles_3d_set_OP, i_cubicles_set_nb
base = list(funcs[0](arr))
for func in funcs:
res = list(func(arr))
print(f"{func.__name__:>24} {is_equal_i_set(base, res)!s:>5}", end=' ')
# %timeit -n 1 -r 1 list(func(arr))
%timeit list(func(arr))
# i_cubicles_3d_set_OP_nb True 1 loop, best of 5: 130 ms per loop
# i_cubicles_3d_set_OP True 1 loop, best of 5: 776 ms per loop
# i_cubicles_set_nb True 10 loops, best of 5: 151 ms per loop
表明使用 Numba 非常有效。
没有唯一性
如果愿意放弃 return 仅在隔间内设置唯一元素的要求,将它们替换为隔间内的所有元素,那么确实会获得一些(但不是很多)速度:
@nb.njit
def i_cubicles_3d_nb(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
for i in range(nx):
for j in range(ny):
for k in range(nz):
window = arr[i:i + size, j:j + size, k:k + size]
if window.any():
yield window.ravel(), (i, j, k)
def i_cubicles_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
return _i_cubicles_nb(view.reshape((-1, window_size)), reduced_shape)
@nb.njit
def unravel_index(x, shape):
result = np.zeros(len(shape), dtype=np.int_)
for i, dim in enumerate(shape[::-1], 1):
result[-i] = x % dim
x //= dim
return result
@nb.njit
def any_nb(arr):
for x in arr:
if x:
return True
return False
@nb.njit
def _i_cubicles_nb(arr, shape):
for i, x in enumerate(arr):
if any_nb(x):
yield x, unravel_index(i, shape)
如以下基准所证明(在与之前相同的 50³ 大小的输入上):
def is_equal_i(a, b):
return all(np.allclose(x[0], y[0]) and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = i_cubicles_3d_nb, i_cubicles_nb
base = list(funcs[0](arr))
for func in funcs:
res = list(func(arr))
print(f"{func.__name__:>24} {is_equal_i(base, res)!s:>5}", end=' ')
# %timeit -n 1 -r 1 list(func(arr))
%timeit list(func(arr))
# print()
# i_cubicles_3d_nb True 10 loops, best of 5: 116 ms per loop
# i_cubicles_nb True 10 loops, best of 5: 125 ms per loop
没有收益(也没有唯一性)
虽然很明显只有使用 Numba / Cython 才能使与 OP 输出完全匹配的函数变得更快,但是通过前面的 OP 代码的一些特性可以获得许多快速方法。
特别是,在创建生成器时,会花费大量时间来创建要产生的实际对象。 可以一次 returned(最重要的是分配)相同的信息,并且速度显着提高,特别是如果我们跳过创建用于计算唯一元素的容器。
一旦我们接受 return 隔间内的所有元素而不是它的独特元素,就可以设计 NumPy-only 矢量化(快速和 dimension-agnostic)方法,同时更快Numba(特定于 3d)实现:
def cubicles_np(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
mask = np.any(view, axis=tuple(range(-arr.ndim, 0)))
return view[mask, ...], np.array(np.nonzero(mask)).transpose()
def cubicles_tr_np(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=window + reduced_shape, strides=arr.strides * 2, writeable=False)
mask = np.any(view, axis=tuple(range(arr.ndim)))
return (
view[..., mask].reshape((window_size, -1)).transpose().reshape((-1, *window)),
np.array(np.nonzero(mask)).transpose())
def cubicles_nb(arr, size=2):
window = (size,) * arr.ndim
window_size = size ** arr.ndim
reduced_shape = tuple(dim - size + 1 for dim, size in zip(arr.shape, window))
view = np.lib.stride_tricks.as_strided(
arr, shape=reduced_shape + window, strides=arr.strides * 2, writeable=False)
values, indexes = _cubicles_nb(view.reshape((-1, window_size)), reduced_shape, arr.ndim)
return values.reshape((-1, *window)), indexes
@nb.njit
def any_nb(arr):
for x in arr:
if x:
return True
return False
@nb.njit
def _cubicles_nb(arr, shape, ndim):
n, k = arr.shape
indexes = np.empty((n, ndim), dtype=np.bool_)
result = np.empty((n, k), dtype=arr.dtype)
count = 0
for i in range(n):
x = arr[i]
if any_nb(x):
indexes[count] = unravel_index(i, shape)
result[count] = x
count += 1
return result[:count].copy(), indexes[:count].copy()
@nb.njit
def any_cubicle_3d_nb(arr, size):
for i in range(size):
for j in range(size):
for k in range(size):
if arr[i, j, k]:
return True
return False
@nb.njit
def cubicles_3d_nb(arr, size=2):
nx, ny, nz = arr.shape
nx += 1 - size
ny += 1 - size
nz += 1 - size
nn = nx * ny * nz
indexes = np.empty((nn, 3), dtype=np.bool_)
result = np.empty((nn, size, size, size), dtype=arr.dtype)
count = 0
for i in range(nx):
for j in range(ny):
for k in range(nz):
x = arr[i:i + size, j:j + size, k:k + size]
if any_cubicle_3d_nb(x, size):
result[count] = x
indexes[count] = i, j, k
count += 1
return result[:count].copy(), indexes[:count].copy()
在 50³ 大小的输入上再次获得的计时确实表明 Numba-based 方法表明拼出循环比在视图中循环要快得多。 事实上,如果没有显式地沿维度循环,NumPy-only 方法可以比 Numba 加速方法更快。
请注意,cubicles_3d_nb()
本质上可以看作是 fast_compute()
在我的机器上的时间和输入——加上额外的 .copy()
——似乎表明 cubicles_3d_nb()
更有效——可能是因为short-circuiting 在“任何”代码中,无需手动整理值)。
def is_equal(a, b):
return all(np.allclose(x[0], y[0]) and np.allclose(x[1], y[1]) for x, y in zip(a, b))
funcs = cubicles_3d_nb, cubicles_nb, cubicles_np, cubicles_tr_np
base = funcs[0](arr)
for func in funcs:
res = func(arr)
print(f"{func.__name__:>24} {is_equal(base, res)!s:>5}", end=' ')
%timeit func(arr)
# cubicles_3d_nb True 100 loops, best of 5: 3.82 ms per loop
# cubicles_nb True 10 loops, best of 5: 23 ms per loop
# cubicles_np True 10 loops, best of 5: 24.7 ms per loop
# cubicles_tr_np True 100 loops, best of 5: 16.5 ms per loop
索引说明
如果要一次给出所有结果,那么索引本身在存储有关 non-zero 隔间位置的信息方面并不是特别有效,除非它们很少。
相反,布尔数组的内存效率更高。
索引需要 index_size * ndim * num
(num
是 non-zero 个隔间的数量,必然是 0 < num < prod(shape)
)。
屏蔽需要 bool_size * prod(shape)
。
对于 NumPy bool_size = 8
而 index_size = 64
(可以调整但通常至少为 16),所以:index_size = bool_size * k
。
所以屏蔽更有效,只要:
num < prod(shape) // (k * ndim)
对于 3D 和典型的 index_size = 64
,这意味着 (num / prod(shape)) < (1 / 24)
,因此如果 non-zero 个隔间是 ~5% 或更少,索引是有效的。
Speed-wise,只要 non-zero 隔间,使用布尔掩码而不是索引可能会导致实现速度小而合理(~5 到 ~20%)也不少。
附录:填充
虽然 Numba 不支持 np.pad()
,但在 Numba 之外调用任何填充函数都非常简单。
此外,对于某些输入组合,np.pad()
比在切片输出上简单赋值要慢:
import numpy as np
import numba as nb
@nb.njit
def pad_3d_nb(arr, size=1):
nx, ny, nz = arr.shape
result = np.zeros((nx + 2 * size, ny + 2 * size, nz + 2 * size), dtype=arr.dtype)
for i in range(nx):
for j in range(ny):
for k in range(nz):
result[i + size, j + size, k + size] = arr[i, j, k]
return result
def const_pad(arr, size=1, value=0):
shape = tuple(dim + 2 * size for dim in arr.shape)
mask = tuple(slice(size, dim + size) for dim in arr.shape)
result = np.full(shape, value, dtype=arr.dtype)
result[mask] = arr
return result
np.random.seed(42)
n = 200
k = 10
arr = np.random.randint(0, 3, (n, n, n), dtype=np.uint8)
base = np.pad(arr, (k, k))
print(np.allclose(pad_3d_nb(arr, k), base))
# True
print(np.allclose(const_pad(arr, k), base))
# True
%timeit np.pad(arr, (k, k))
# 100 loops, best of 5: 3.01 ms per loop
%timeit pad_3d_nb(arr, k)
# 100 loops, best of 5: 11.5 ms per loop
%timeit const_pad(arr, k)
# 100 loops, best of 5: 2.53 ms per loop