给定 numpy 中的索引列表添加元素的最有效方法
Most efficient way of adding elements given the index list in numpy
假设我们有一个形状为 (N, ) 的 numpy 数组 A 和一个形状为 (M, 3) 且具有数据的矩阵 D 以及另一个形状为 (M, 3) 且具有每个数据对应索引的矩阵 I D中的元素。我们如何构造给定D和I的A,以便添加重复的元素索引?
示例:
############# A[I] := D ###################################
A = [0.5, 0.6] # Final Reduced Data Vector
D = [[0.1, 0.1 0.2], [0.2, 0.4, 0.1]] # Data
I = [[0, 1, 0], [0, 1, 1]] # Indices
例如:
A[0] = D[0][0] + D[0][2] + D[1][0] # 0.5 = 0.1 + 0.2 + 0.2
因为在索引矩阵中我们有:
I[0][0] = I[0][2] = I[1][0] = 0
目标是避免遍历所有元素以高效处理大 N、M (10^6-10^9)。
I
和 D
的形状无关紧要:您可以在不改变结果的情况下清楚地分解数组:
index = np.ravel(I)
data = np.ravel(D)
现在您可以根据 I
:
对两个数组进行排序
sorter = np.argsort(index)
index = index[sorter]
data = data[sorter]
这很有用,因为现在 index
看起来像这样:
0, 0, 0, 1, 1, 1
而data
是这样的:
0.1, 0.2, 0.2, 0.1, 0.4, 0.1
将 运行 个连续数字相加应该比处理随机位置更容易。让我们从找到 运行s 开始的索引开始:
runs = np.r_[0, np.flatnonzero(np.diff(index)) + 1]
现在您可以利用像 np.add
这样的 ufunc 有一个名为 reduceat
的部分 reduce
操作这一事实。这允许您对数组的区域求和:
a = np.add.reduceat(data, runs)
如果 I
保证包含 [0, A.size
) 中的所有索引至少一次,你就完成了:只需分配给 A
而不是 a
.如果不是,您可以使用 index
中每个 运行 的开头是目标索引这一事实来进行映射:
A = np.zeros(n)
A[index[runs]] = a
算法复杂度分析:
ravel
时间复杂度为 O(1),如果数据在数组中则为 space。如果它是一个列表,这在时间上是 O(MN) 并且 space
argsort
在时间上是 O(MN log MN) 而 O(MN)
在 space 上
sorter
的索引在时间上是 O(MN) 并且 space
- 计算
runs
时间复杂度为 O(MN),时间复杂度为 O(MN + M) = O(MN) space
reduceat
是单次传递:时间为 O(MN),space 为 O(M)
- 重新分配
A
时间为 O(M) 并且 space
总计:O(MN log MN) 时间,O(MN) space
TL;DR
def make_A(D, I, M):
index = np.ravel(I)
data = np.ravel(D)
sorter = np.argsort(index)
index = index[sorter]
if index[0] < 0 or index[-1] >= M:
raise ValueError('Bad indices')
data = data[sorter]
runs = np.r_[0, np.flatnonzero(np.diff(index)) + 1]
a = np.add.reduceat(data, runs)
if a.size == M:
return a
A = np.zeros(M)
A[index[runs]] = a
return A
我怀疑你能比 np.bincount
快得多 - 请注意 official documentation 如何提供这个确切的用例
# Your example
A = [0.5, 0.6]
D = [[0.1, 0.1, 0.2], [0.2, 0.4, 0.1]]
I = [[0, 1, 0], [0, 1, 1]]
# Solution
import numpy as np
D, I = np.array(D).flatten(), np.array(I).flatten()
print(np.bincount(I, D)) #[0.5 0.6]
如果你事先知道 A 的大小,你可以简单地使用 add.at:
import numpy as np
D = [[0.1, 0.1, 0.2], [0.2, 0.4, 0.1]]
I = [[0, 1, 0], [0, 1, 1]]
arr_D = np.array(D)
arr_I = np.array(I)
A = np.zeros(2)
np.add.at(A, arr_I, arr_D)
print(A)
输出
[0.5 0.6]
如果不知道A的大小,可以用max计算:
A = np.zeros(arr_I.max() + 1)
np.add.at(A, arr_I, arr_D)
print(A)
输出
[0.5 0.6]
这个算法的时间复杂度是O(N),还有space复杂度O(N)。
那个:
arr_I.max() + 1
是 bincount 在幕后所做的,来自 documentation:
The result of binning the input array. The length of out is equal to
np.amax(x)+1.
也就是说,bincount 至少快一个数量级:
I = np.random.choice(1000, size=(1000, 3), replace=True)
D = np.random.random((1000, 3))
%timeit make_A_with_at(I, D, 1000)
213 µs ± 25 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit make_A_with_bincount(I, D)
11 µs ± 15.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
假设我们有一个形状为 (N, ) 的 numpy 数组 A 和一个形状为 (M, 3) 且具有数据的矩阵 D 以及另一个形状为 (M, 3) 且具有每个数据对应索引的矩阵 I D中的元素。我们如何构造给定D和I的A,以便添加重复的元素索引?
示例:
############# A[I] := D ###################################
A = [0.5, 0.6] # Final Reduced Data Vector
D = [[0.1, 0.1 0.2], [0.2, 0.4, 0.1]] # Data
I = [[0, 1, 0], [0, 1, 1]] # Indices
例如:
A[0] = D[0][0] + D[0][2] + D[1][0] # 0.5 = 0.1 + 0.2 + 0.2
因为在索引矩阵中我们有:
I[0][0] = I[0][2] = I[1][0] = 0
目标是避免遍历所有元素以高效处理大 N、M (10^6-10^9)。
I
和 D
的形状无关紧要:您可以在不改变结果的情况下清楚地分解数组:
index = np.ravel(I)
data = np.ravel(D)
现在您可以根据 I
:
sorter = np.argsort(index)
index = index[sorter]
data = data[sorter]
这很有用,因为现在 index
看起来像这样:
0, 0, 0, 1, 1, 1
而data
是这样的:
0.1, 0.2, 0.2, 0.1, 0.4, 0.1
将 运行 个连续数字相加应该比处理随机位置更容易。让我们从找到 运行s 开始的索引开始:
runs = np.r_[0, np.flatnonzero(np.diff(index)) + 1]
现在您可以利用像 np.add
这样的 ufunc 有一个名为 reduceat
的部分 reduce
操作这一事实。这允许您对数组的区域求和:
a = np.add.reduceat(data, runs)
如果 I
保证包含 [0, A.size
) 中的所有索引至少一次,你就完成了:只需分配给 A
而不是 a
.如果不是,您可以使用 index
中每个 运行 的开头是目标索引这一事实来进行映射:
A = np.zeros(n)
A[index[runs]] = a
算法复杂度分析:
ravel
时间复杂度为 O(1),如果数据在数组中则为 space。如果它是一个列表,这在时间上是 O(MN) 并且 spaceargsort
在时间上是 O(MN log MN) 而O(MN)
在 space 上
sorter
的索引在时间上是 O(MN) 并且 space- 计算
runs
时间复杂度为 O(MN),时间复杂度为 O(MN + M) = O(MN) space reduceat
是单次传递:时间为 O(MN),space 为 O(M)
- 重新分配
A
时间为 O(M) 并且 space
总计:O(MN log MN) 时间,O(MN) space
TL;DR
def make_A(D, I, M):
index = np.ravel(I)
data = np.ravel(D)
sorter = np.argsort(index)
index = index[sorter]
if index[0] < 0 or index[-1] >= M:
raise ValueError('Bad indices')
data = data[sorter]
runs = np.r_[0, np.flatnonzero(np.diff(index)) + 1]
a = np.add.reduceat(data, runs)
if a.size == M:
return a
A = np.zeros(M)
A[index[runs]] = a
return A
我怀疑你能比 np.bincount
快得多 - 请注意 official documentation 如何提供这个确切的用例
# Your example
A = [0.5, 0.6]
D = [[0.1, 0.1, 0.2], [0.2, 0.4, 0.1]]
I = [[0, 1, 0], [0, 1, 1]]
# Solution
import numpy as np
D, I = np.array(D).flatten(), np.array(I).flatten()
print(np.bincount(I, D)) #[0.5 0.6]
如果你事先知道 A 的大小,你可以简单地使用 add.at:
import numpy as np
D = [[0.1, 0.1, 0.2], [0.2, 0.4, 0.1]]
I = [[0, 1, 0], [0, 1, 1]]
arr_D = np.array(D)
arr_I = np.array(I)
A = np.zeros(2)
np.add.at(A, arr_I, arr_D)
print(A)
输出
[0.5 0.6]
如果不知道A的大小,可以用max计算:
A = np.zeros(arr_I.max() + 1)
np.add.at(A, arr_I, arr_D)
print(A)
输出
[0.5 0.6]
这个算法的时间复杂度是O(N),还有space复杂度O(N)。
那个:
arr_I.max() + 1
是 bincount 在幕后所做的,来自 documentation:
The result of binning the input array. The length of out is equal to np.amax(x)+1.
也就是说,bincount 至少快一个数量级:
I = np.random.choice(1000, size=(1000, 3), replace=True)
D = np.random.random((1000, 3))
%timeit make_A_with_at(I, D, 1000)
213 µs ± 25 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit make_A_with_bincount(I, D)
11 µs ± 15.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)