向量化 Python 中包含 cumsum() 和迭代切片的 for 循环
Vectorizing a for loop in Python that includes a cumsum() and iterative slicing
我有以下 for 循环,它在三个相同长度的 numpy 数组上运行:
n = 100
a = np.random.random(n)
b = np.random.random(n)
c = np.random.random(n)
valid = np.empty(n)
for i in range(n):
valid[i] = np.any(a[i] > b[i:] + c[i:].cumsum())
有没有办法用一些向量化的 numpy 操作替换这个 for 循环?
例如,因为我只关心 a[i]
是否大于 b[i:]
中的任何值,所以我可以执行 np.minimum.accumulate(b[::-1])[::-1]
获取 b
的最小值每个索引及以后的索引,然后将其与 a
进行比较,如下所示:
a > np.minimum.accumulate(b[::-1])[::-1]
但我仍然需要一种方法将 c[i:].cumsum()
向量化为单个数组计算。
我假设您想对其进行矢量化以减少 运行 时间。由于您仅使用纯 NumPy 操作,因此可以使用 numba
:参见 5 Minutes Guide to Numba
它将看起来像这样:
import numba
@numba.njit()
def valid_for_single_idx(idx, a, b, c):
return np.any(a[idx] > b[idx:] + c[idx:].cumsum())
valid = np.empty(n)
for i in range(n):
valid[i] = valid_for_single_idx(i, a, b, c)
到目前为止,它并不是真正的矢量化(因为循环仍在发生),但它将 numpy 行转换为 llvm,因此它尽可能快地发生。
虽然没有提高速度,但是看起来好看一点,可以用.map
:
import numba
from functools import partial
@numba.njit()
def valid_for_single_idx(idx, a, b, c):
return np.any(a[idx] > b[idx:] + c[idx:].cumsum())
valid = map(partial(valid_for_single_idx, a=a, b=b, c=c), range(n))
您的目标是为每个 i
找到 b[i:] + c[i:].cumsum()
的最小值。显然,您可以直接将其与 a
进行比较。
可以将c[i:].cumsum()
的元素写成矩阵的上三角。让我们看一个 n = 3
:
的玩具箱
c = [c1, c2, c3]
s1 = c.cumsum()
s0 = np.r_[0, s1[:-1]]
累加和的元素可以写成
c1, c1 + c2, c1 + c2 + c3 s1[0:] s1[0:] - s0[0]
c2, c2 + c3 = s1[1:] - c1 = s1[1:] - s0[1]
c3 s1[2:] - (c1 + c2) s1[2:] - s0[2]
您可以使用 np.triu_indices
将这些总和构造为一个拆分数组:
r, c = np.triu_indices(n)
diff = s1[c] - s0[r] + b[c]
因为 np.minimum
is a ufunc, you can accumulate diff
for each run defined by r
using minimum.reduceat
. The locations are given roughly by np.flatnonzero(np.diff(r)) + 1
, but you can generate them faster with np.arange
:
m = np.minimum.reduceat(diff, np.r_[0, np.arange(n, 1, -1).cumsum()])
所以最后,你有:
valid = a > m
TL;DR
s1 = c.cumsum()
s0 = np.r_[0, s1[:-1]]
r, c = np.triu_indices(n)
valid = a > np.minimum.reduceat(s1[c] - s0[r] + b[c], np.r_[0, np.arange(n, 1, -1).cumsum()])
我有以下 for 循环,它在三个相同长度的 numpy 数组上运行:
n = 100
a = np.random.random(n)
b = np.random.random(n)
c = np.random.random(n)
valid = np.empty(n)
for i in range(n):
valid[i] = np.any(a[i] > b[i:] + c[i:].cumsum())
有没有办法用一些向量化的 numpy 操作替换这个 for 循环?
例如,因为我只关心 a[i]
是否大于 b[i:]
中的任何值,所以我可以执行 np.minimum.accumulate(b[::-1])[::-1]
获取 b
的最小值每个索引及以后的索引,然后将其与 a
进行比较,如下所示:
a > np.minimum.accumulate(b[::-1])[::-1]
但我仍然需要一种方法将 c[i:].cumsum()
向量化为单个数组计算。
我假设您想对其进行矢量化以减少 运行 时间。由于您仅使用纯 NumPy 操作,因此可以使用 numba
:参见 5 Minutes Guide to Numba
它将看起来像这样:
import numba
@numba.njit()
def valid_for_single_idx(idx, a, b, c):
return np.any(a[idx] > b[idx:] + c[idx:].cumsum())
valid = np.empty(n)
for i in range(n):
valid[i] = valid_for_single_idx(i, a, b, c)
到目前为止,它并不是真正的矢量化(因为循环仍在发生),但它将 numpy 行转换为 llvm,因此它尽可能快地发生。
虽然没有提高速度,但是看起来好看一点,可以用.map
:
import numba
from functools import partial
@numba.njit()
def valid_for_single_idx(idx, a, b, c):
return np.any(a[idx] > b[idx:] + c[idx:].cumsum())
valid = map(partial(valid_for_single_idx, a=a, b=b, c=c), range(n))
您的目标是为每个 i
找到 b[i:] + c[i:].cumsum()
的最小值。显然,您可以直接将其与 a
进行比较。
可以将c[i:].cumsum()
的元素写成矩阵的上三角。让我们看一个 n = 3
:
c = [c1, c2, c3]
s1 = c.cumsum()
s0 = np.r_[0, s1[:-1]]
累加和的元素可以写成
c1, c1 + c2, c1 + c2 + c3 s1[0:] s1[0:] - s0[0]
c2, c2 + c3 = s1[1:] - c1 = s1[1:] - s0[1]
c3 s1[2:] - (c1 + c2) s1[2:] - s0[2]
您可以使用 np.triu_indices
将这些总和构造为一个拆分数组:
r, c = np.triu_indices(n)
diff = s1[c] - s0[r] + b[c]
因为 np.minimum
is a ufunc, you can accumulate diff
for each run defined by r
using minimum.reduceat
. The locations are given roughly by np.flatnonzero(np.diff(r)) + 1
, but you can generate them faster with np.arange
:
m = np.minimum.reduceat(diff, np.r_[0, np.arange(n, 1, -1).cumsum()])
所以最后,你有:
valid = a > m
TL;DR
s1 = c.cumsum()
s0 = np.r_[0, s1[:-1]]
r, c = np.triu_indices(n)
valid = a > np.minimum.reduceat(s1[c] - s0[r] + b[c], np.r_[0, np.arange(n, 1, -1).cumsum()])