向量化 Python 中包含 cumsum() 和迭代切片的 for 循环

Vectorizing a for loop in Python that includes a cumsum() and iterative slicing

我有以下 for 循环,它在三个相同长度的 numpy 数组上运行:

n = 100
a = np.random.random(n)
b = np.random.random(n)
c = np.random.random(n)

valid = np.empty(n)
for i in range(n):
    valid[i] = np.any(a[i] > b[i:] + c[i:].cumsum())

有没有办法用一些向量化的 numpy 操作替换这个 for 循环?

例如,因为我只关心 a[i] 是否大于 b[i:] 中的任何值,所以我可以执行 np.minimum.accumulate(b[::-1])[::-1] 获取 b 的最小值每个索引及以后的索引,然后将其与 a 进行比较,如下所示:

a > np.minimum.accumulate(b[::-1])[::-1]

但我仍然需要一种方法将 c[i:].cumsum() 向量化为单个数组计算。

我假设您想对其进行矢量化以减少 运行 时间。由于您仅使用纯 NumPy 操作,因此可以使用 numba:参见 5 Minutes Guide to Numba

它将看起来像这样:

import numba

@numba.njit()
def valid_for_single_idx(idx, a, b, c):
    return np.any(a[idx] > b[idx:] + c[idx:].cumsum())


valid = np.empty(n)
for i in range(n):
    valid[i] = valid_for_single_idx(i, a, b, c)

到目前为止,它并不是真正的矢量化(因为循环仍在发生),但它将 numpy 行转换为 llvm,因此它尽可能快地发生。

虽然没有提高速度,但是看起来好看一点,可以用.map:

import numba
from functools import partial

@numba.njit()
def valid_for_single_idx(idx, a, b, c):
    return np.any(a[idx] > b[idx:] + c[idx:].cumsum())


valid = map(partial(valid_for_single_idx, a=a, b=b, c=c), range(n))

您的目标是为每个 i 找到 b[i:] + c[i:].cumsum() 的最小值。显然,您可以直接将其与 a 进行比较。

可以将c[i:].cumsum()的元素写成矩阵的上三角。让我们看一个 n = 3:

的玩具箱
c = [c1, c2, c3]
s1 = c.cumsum()
s0 = np.r_[0, s1[:-1]]

累加和的元素可以写成

c1, c1 + c2, c1 + c2 + c3     s1[0:]                 s1[0:] - s0[0]
         c2,      c2 + c3  =  s1[1:] - c1         =  s1[1:] - s0[1]
                       c3     s1[2:] - (c1 + c2)     s1[2:] - s0[2]

您可以使用 np.triu_indices 将这些总和构造为一个拆分数组:

r, c = np.triu_indices(n)
diff = s1[c] - s0[r] + b[c]

因为 np.minimum is a ufunc, you can accumulate diff for each run defined by r using minimum.reduceat. The locations are given roughly by np.flatnonzero(np.diff(r)) + 1, but you can generate them faster with np.arange:

m = np.minimum.reduceat(diff, np.r_[0, np.arange(n, 1, -1).cumsum()])

所以最后,你有:

valid = a > m

TL;DR

s1 = c.cumsum()
s0 = np.r_[0, s1[:-1]]
r, c = np.triu_indices(n)
valid = a > np.minimum.reduceat(s1[c] - s0[r] + b[c], np.r_[0, np.arange(n, 1, -1).cumsum()])