用于在相同长度的一维 numpy 数组上评估一维函数数组的高效算法
Efficient algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array
我有一个(大)长度为 N 的 k 不同函数数组,以及一个长度为 N 的横坐标数组。我想将横坐标处的函数计算为 return 一个长度为 N 的纵坐标数组,关键是,我需要非常快地完成它。
我已经尝试通过以下循环调用 np.where,这太慢了:
创建一些假数据来说明问题:
def trivial_functional(i): return lambda x : i*x
k = 250
func_table = [trivial_functional(j) for j in range(k)]
func_table = np.array(func_table) # possibly unnecessary
我们有 250 个不同的函数 table。现在我创建了一个大数组,其中包含这些函数的许多重复条目,以及一组相同长度的点,这些点应在这些点上进行评估。
Npts = 1e6
abcissa_array = np.random.random(Npts)
function_indices = np.random.random_integers(0,len(func_table)-1,Npts)
func_array = func_table[function_indices]
最后,遍历数据使用的每个函数并在相关点集上对其进行评估:
desired_output = np.zeros(Npts)
for func_index in set(function_indices):
idx = np.where(function_indices==func_index)[0]
desired_output[idx] = func_table[func_index](abcissa_array[idx])
这个循环在我的笔记本电脑上大约需要 0.35 秒,这是我代码中最大的瓶颈一个数量级。
有人知道如何避免对 np.where 的盲目查找调用吗?有没有巧妙地使用 numba 来加快这个循环的速度?
这是函数式编程的一个很好的例子,在 Python 中有所模仿。
现在,如果您想将您的函数应用于一组点,我建议您使用 numpy
的 ufunc
框架,它将允许您创建速度极快的矢量化版本函数。
感谢 hpaulj 建议采用 groupby 方法。有很多用于此操作的固定例程,例如 Pandas DataFrames,但它们都带有数据结构初始化的开销成本,这是一次性的,但如果使用 for 可能会很昂贵只是一个计算。
这是我的纯 numpy 解决方案,比我使用的原始 where 循环快 13 倍。 结果总结是我用np.argsort和np.unique连同一些花哨的索引体操。
首先我们对函数索引进行排序,然后找到排序数组中每个新索引开始的元素
idx_funcsort = np.argsort(function_indices)
unique_funcs, unique_func_indices = np.unique(function_indices[idx_funcsort], return_index=True)
现在不再需要盲目查找,因为我们确切地知道排序数组的哪个切片对应于每个唯一函数。所以我们仍然循环遍历每个调用的函数,但不调用 where:
for func_index in range(len(unique_funcs)-1):
idx_func = idx_funcsort[unique_func_indices[func_index]:unique_func_indices[func_index+1]]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])
这涵盖了除最终索引之外的所有索引,由于 Python 索引约定,我们需要单独调用它有点烦人:
func_index = len(unique_funcs)-1
idx_func = idx_funcsort[unique_func_indices[func_index]:]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])
这给出了与 where 循环(簿记完整性检查)相同的结果,但此循环的运行时间为 0.027 秒,比我最初的计算速度快了 13 倍。
这与您的(优秀!)自我回答几乎相同,但不那么繁琐。在我的机器上它似乎也稍微快了一点——基于粗略的 test.
大约 30 毫秒
def apply_indexed_fast(array, func_indices, func_table):
func_argsort = func_indices.argsort()
func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
func_ranges.append(None)
out = np.zeros_like(array)
for f, start, end in zip(func_table, func_ranges, func_ranges[1:]):
ix = func_argsort[start:end]
out[ix] = f(array[ix])
return out
像你的一样,这将一系列 argsort
索引分成块,每个块对应于 func_table
中的一个函数。然后它使用每个块为其相应函数 select 输入和输出索引。为了确定块边界,它使用 np.searchsorted
而不是 np.unique
—— 其中 searchsorted(a, b)
可以被认为是二进制搜索算法, returns 中第一个值的索引a
等于或大于给定值或 b
中的值。
然后 zip 函数简单地并行迭代它的参数,从每个参数返回一个项目,将它们收集在一个元组中,然后将它们串在一起成一个列表。 (所以 zip([1, 2, 3], ['a', 'b', 'c'], ['b', 'c', 'd'])
returns [(1, 'a', 'b'), (2, 'b', 'c'), (3, 'c', 'd')]
。)这个连同 for
语句对 "unpack" 这些元组的内置能力,允许一种简洁但富有表现力的方式并行迭代多个序列。
在这种情况下,我用它来遍历 func_tables
中的函数以及 func_ranges
的两个不同步副本。这确保 end
变量中 func_ranges
的项目始终比 start
变量中的项目领先一步。通过将 None
附加到 func_ranges
,我确保最终块得到妥善处理——zip
在其任何一个参数用完项目时停止,这会切断顺序。方便的是,None
值也用作开放式切片索引!
另一个做同样事情的技巧需要多几行,但内存开销较低,尤其是与 zip
、izip
等价的 itertools
一起使用时:
range_iter_a = iter(func_ranges) # create generators that iterate over the
range_iter_b = iter(func_ranges) # values in `func_ranges` without making copies
next(range_iter_b, None) # advance the second generator by one
for f, start, end in itertools.izip(func_table, range_iter_a, range_iter_b):
...
但是,这些基于生成器的低开销方法有时可能比原始列表慢一点。另外,请注意在 Python 3 中,zip
的行为更像 izip
。
我有一个(大)长度为 N 的 k 不同函数数组,以及一个长度为 N 的横坐标数组。我想将横坐标处的函数计算为 return 一个长度为 N 的纵坐标数组,关键是,我需要非常快地完成它。
我已经尝试通过以下循环调用 np.where,这太慢了:
创建一些假数据来说明问题:
def trivial_functional(i): return lambda x : i*x
k = 250
func_table = [trivial_functional(j) for j in range(k)]
func_table = np.array(func_table) # possibly unnecessary
我们有 250 个不同的函数 table。现在我创建了一个大数组,其中包含这些函数的许多重复条目,以及一组相同长度的点,这些点应在这些点上进行评估。
Npts = 1e6
abcissa_array = np.random.random(Npts)
function_indices = np.random.random_integers(0,len(func_table)-1,Npts)
func_array = func_table[function_indices]
最后,遍历数据使用的每个函数并在相关点集上对其进行评估:
desired_output = np.zeros(Npts)
for func_index in set(function_indices):
idx = np.where(function_indices==func_index)[0]
desired_output[idx] = func_table[func_index](abcissa_array[idx])
这个循环在我的笔记本电脑上大约需要 0.35 秒,这是我代码中最大的瓶颈一个数量级。
有人知道如何避免对 np.where 的盲目查找调用吗?有没有巧妙地使用 numba 来加快这个循环的速度?
这是函数式编程的一个很好的例子,在 Python 中有所模仿。
现在,如果您想将您的函数应用于一组点,我建议您使用 numpy
的 ufunc
框架,它将允许您创建速度极快的矢量化版本函数。
感谢 hpaulj 建议采用 groupby 方法。有很多用于此操作的固定例程,例如 Pandas DataFrames,但它们都带有数据结构初始化的开销成本,这是一次性的,但如果使用 for 可能会很昂贵只是一个计算。
这是我的纯 numpy 解决方案,比我使用的原始 where 循环快 13 倍。 结果总结是我用np.argsort和np.unique连同一些花哨的索引体操。
首先我们对函数索引进行排序,然后找到排序数组中每个新索引开始的元素
idx_funcsort = np.argsort(function_indices)
unique_funcs, unique_func_indices = np.unique(function_indices[idx_funcsort], return_index=True)
现在不再需要盲目查找,因为我们确切地知道排序数组的哪个切片对应于每个唯一函数。所以我们仍然循环遍历每个调用的函数,但不调用 where:
for func_index in range(len(unique_funcs)-1):
idx_func = idx_funcsort[unique_func_indices[func_index]:unique_func_indices[func_index+1]]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])
这涵盖了除最终索引之外的所有索引,由于 Python 索引约定,我们需要单独调用它有点烦人:
func_index = len(unique_funcs)-1
idx_func = idx_funcsort[unique_func_indices[func_index]:]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])
这给出了与 where 循环(簿记完整性检查)相同的结果,但此循环的运行时间为 0.027 秒,比我最初的计算速度快了 13 倍。
这与您的(优秀!)自我回答几乎相同,但不那么繁琐。在我的机器上它似乎也稍微快了一点——基于粗略的 test.
大约 30 毫秒def apply_indexed_fast(array, func_indices, func_table):
func_argsort = func_indices.argsort()
func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
func_ranges.append(None)
out = np.zeros_like(array)
for f, start, end in zip(func_table, func_ranges, func_ranges[1:]):
ix = func_argsort[start:end]
out[ix] = f(array[ix])
return out
像你的一样,这将一系列 argsort
索引分成块,每个块对应于 func_table
中的一个函数。然后它使用每个块为其相应函数 select 输入和输出索引。为了确定块边界,它使用 np.searchsorted
而不是 np.unique
—— 其中 searchsorted(a, b)
可以被认为是二进制搜索算法, returns 中第一个值的索引a
等于或大于给定值或 b
中的值。
然后 zip 函数简单地并行迭代它的参数,从每个参数返回一个项目,将它们收集在一个元组中,然后将它们串在一起成一个列表。 (所以 zip([1, 2, 3], ['a', 'b', 'c'], ['b', 'c', 'd'])
returns [(1, 'a', 'b'), (2, 'b', 'c'), (3, 'c', 'd')]
。)这个连同 for
语句对 "unpack" 这些元组的内置能力,允许一种简洁但富有表现力的方式并行迭代多个序列。
在这种情况下,我用它来遍历 func_tables
中的函数以及 func_ranges
的两个不同步副本。这确保 end
变量中 func_ranges
的项目始终比 start
变量中的项目领先一步。通过将 None
附加到 func_ranges
,我确保最终块得到妥善处理——zip
在其任何一个参数用完项目时停止,这会切断顺序。方便的是,None
值也用作开放式切片索引!
另一个做同样事情的技巧需要多几行,但内存开销较低,尤其是与 zip
、izip
等价的 itertools
一起使用时:
range_iter_a = iter(func_ranges) # create generators that iterate over the
range_iter_b = iter(func_ranges) # values in `func_ranges` without making copies
next(range_iter_b, None) # advance the second generator by one
for f, start, end in itertools.izip(func_table, range_iter_a, range_iter_b):
...
但是,这些基于生成器的低开销方法有时可能比原始列表慢一点。另外,请注意在 Python 3 中,zip
的行为更像 izip
。