用于在相同长度的一维 numpy 数组上评估一维函数数组的高效算法

Efficient algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array

我有一个(大)长度为 N 的 k 不同函数数组,以及一个长度为 N 的横坐标数组。我想将横坐标处的函数计算为 return 一个长度为 N 的纵坐标数组,关键是,我需要非常快地完成它。

我已经尝试通过以下循环调用 np.where,这太慢了:

创建一些假数据来说明问题:

def trivial_functional(i): return lambda x : i*x
k = 250
func_table = [trivial_functional(j) for j in range(k)]
func_table = np.array(func_table) # possibly unnecessary

我们有 250 个不同的函数 table。现在我创建了一个大数组,其中包含这些函数的许多重复条目,以及一组相同长度的点,这些点应在这些点上进行评估。

Npts = 1e6
abcissa_array = np.random.random(Npts)
function_indices = np.random.random_integers(0,len(func_table)-1,Npts)
func_array = func_table[function_indices]

最后,遍历数据使用的每个函数并在相关点集上对其进行评估:

desired_output = np.zeros(Npts)
for func_index in set(function_indices):
    idx = np.where(function_indices==func_index)[0]
    desired_output[idx] = func_table[func_index](abcissa_array[idx])

这个循环在我的笔记本电脑上大约需要 0.35 秒,这是我代码中最大的瓶颈一个数量级。

有人知道如何避免对 np.where 的盲目查找调用吗?有没有巧妙地使用 numba 来加快这个循环的速度?

这是函数式编程的一个很好的例子,在 Python 中有所模仿。

现在,如果您想将您的函数应用于一组点,我建议您使用 numpyufunc 框架,它将允许您创建速度极快的矢量化版本函数。

感谢 hpaulj 建议采用 groupby 方法。有很多用于此操作的固定例程,例如 Pandas DataFrames,但它们都带有数据结构初始化的开销成本,这是一次性的,但如果使用 for 可能会很昂贵只是一个计算。

这是我的纯 numpy 解决方案,比我使用的原始 where 循环快 13 倍。 结果总结是我用np.argsortnp.unique连同一些花哨的索引体操。

首先我们对函数索引进行排序,然后找到排序数组中每个新索引开始的元素

idx_funcsort = np.argsort(function_indices)
unique_funcs, unique_func_indices = np.unique(function_indices[idx_funcsort], return_index=True)

现在不再需要盲目查找,因为我们确切地知道排序数组的哪个切片对应于每个唯一函数。所以我们仍然循环遍历每个调用的函数,但不调用 where:

for func_index in range(len(unique_funcs)-1):
    idx_func = idx_funcsort[unique_func_indices[func_index]:unique_func_indices[func_index+1]]
    func = func_table[unique_funcs[func_index]]
    desired_output[idx_func] = func(abcissa_array[idx_func])

这涵盖了除最终索引之外的所有索引,由于 Python 索引约定,我们需要单独调用它有点烦人:

func_index = len(unique_funcs)-1
idx_func = idx_funcsort[unique_func_indices[func_index]:]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])

这给出了与 where 循环(簿记完整性检查)相同的结果,但此循环的运行时间为 0.027 秒,比我最初的计算速度快了 13 倍。

这与您的(优秀!)自我回答几乎相同,但不那么繁琐。在我的机器上它似乎也稍微快了一点——基于粗略的 test.

大约 30 毫秒
def apply_indexed_fast(array, func_indices, func_table):
    func_argsort = func_indices.argsort()
    func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
    func_ranges.append(None)
    out = np.zeros_like(array)
    for f, start, end in zip(func_table, func_ranges, func_ranges[1:]):
        ix = func_argsort[start:end]
        out[ix] = f(array[ix])
    return out

像你的一样,这将一系列 argsort 索引分成块,每个块对应于 func_table 中的一个函数。然后它使用每个块为其相应函数 select 输入和输出索引。为了确定块边界,它使用 np.searchsorted 而不是 np.unique —— 其中 searchsorted(a, b) 可以被认为是二进制搜索算法, returns 中第一个值的索引a 等于或大于给定值或 b 中的值。

然后 zip 函数简单地并行迭代它的参数,从每个参数返回一个项目,将它们收集在一个元组中,然后将它们串在一起成一个列表。 (所以 zip([1, 2, 3], ['a', 'b', 'c'], ['b', 'c', 'd']) returns [(1, 'a', 'b'), (2, 'b', 'c'), (3, 'c', 'd')]。)这个连同 for 语句对 "unpack" 这些元组的内置能力,允许一种简洁但富有表现力的方式并行迭代多个序列。

在这种情况下,我用它来遍历 func_tables 中的函数以及 func_ranges 的两个不同步副本。这确保 end 变量中 func_ranges 的项目始终比 start 变量中的项目领先一步。通过将 None 附加到 func_ranges,我确保最终块得到妥善处理——zip 在其任何一个参数用完项目时停止,这会切断顺序。方便的是,None 值也用作开放式切片索引!

另一个做同样事情的技巧需要多几行,但内存开销较低,尤其是与 zipizip 等价的 itertools 一起使用时:

range_iter_a = iter(func_ranges)   # create generators that iterate over the 
range_iter_b = iter(func_ranges)   # values in `func_ranges` without making copies
next(range_iter_b, None)           # advance the second generator by one
for f, start, end in itertools.izip(func_table, range_iter_a, range_iter_b):
    ...

但是,这些基于生成器的低开销方法有时可能比原始列表慢一点。另外,请注意在 Python 3 中,zip 的行为更像 izip