给定 numpy 中的索引列表添加元素的最有效方法

Most efficient way of adding elements given the index list in numpy

假设我们有一个形状为 (N, ) 的 numpy 数组 A 和一个形状为 (M, 3) 且具有数据的矩阵 D 以及另一个形状为 (M, 3) 且具有每个数据对应索引的矩阵 I D中的元素。我们如何构造给定D和I的A,以便添加重复的元素索引?

示例:

############# A[I] := D ###################################  
A = [0.5, 0.6]                         # Final Reduced Data Vector
D = [[0.1, 0.1 0.2], [0.2, 0.4, 0.1]]  # Data
I = [[0, 1, 0], [0, 1, 1]]             # Indices

例如:

A[0] = D[0][0] + D[0][2] + D[1][0]     # 0.5 = 0.1 + 0.2 + 0.2

因为在索引矩阵中我们有:

I[0][0] = I[0][2] = I[1][0] = 0

目标是避免遍历所有元素以高效处理大 N、M (10^6-10^9)。

ID 的形状无关紧要:您可以在不改变结果的情况下清楚地分解数组:

index = np.ravel(I)
data = np.ravel(D)

现在您可以根据 I:

对两个数组进行排序
sorter = np.argsort(index)
index = index[sorter]
data = data[sorter]

这很有用,因为现在 index 看起来像这样:

0, 0, 0, 1, 1, 1

data是这样的:

0.1, 0.2, 0.2, 0.1, 0.4, 0.1

将 运行 个连续数字相加应该比处理随机位置更容易。让我们从找到 运行s 开始的索引开始:

runs = np.r_[0, np.flatnonzero(np.diff(index)) + 1]

现在您可以利用像 np.add 这样的 ufunc 有一个名为 reduceat 的部分 reduce 操作这一事实。这允许您对数组的区域求和:

a = np.add.reduceat(data, runs)

如果 I 保证包含 [0, A.size) 中的所有索引至少一次,你就完成了:只需分配给 A 而不是 a.如果不是,您可以使用 index 中每个 运行 的开头是目标索引这一事实来进行映射:

A = np.zeros(n)
A[index[runs]] = a

算法复杂度分析:

  • ravel 时间复杂度为 O(1),如果数据在数组中则为 space。如果它是一个列表,这在时间上是 O(MN) 并且 space
  • argsort 在时间上是 O(MN log MN) 而 O(MN) 在 space
  • sorter 的索引在时间上是 O(MN) 并且 space
  • 计算 runs 时间复杂度为 O(MN),时间复杂度为 O(MN + M) = O(MN) space
  • reduceat 是单次传递:时间为 O(MN),space
  • 为 O(M)
  • 重新分配 A 时间为 O(M) 并且 space

总计:O(MN log MN) 时间,O(MN) space

TL;DR

def make_A(D, I, M):
    index = np.ravel(I)
    data = np.ravel(D)
    sorter = np.argsort(index)
    index = index[sorter]

    if index[0] < 0 or index[-1] >= M:
        raise ValueError('Bad indices')

    data = data[sorter]
    runs = np.r_[0, np.flatnonzero(np.diff(index)) + 1]
    a = np.add.reduceat(data, runs)
    if a.size == M:
        return a
    A = np.zeros(M)
    A[index[runs]] = a
    return A

我怀疑你能比 np.bincount 快得多 - 请注意 official documentation 如何提供这个确切的用例

# Your example
A = [0.5, 0.6]
D = [[0.1, 0.1, 0.2], [0.2, 0.4, 0.1]]
I = [[0, 1, 0], [0, 1, 1]]

# Solution
import numpy as np    
D, I = np.array(D).flatten(), np.array(I).flatten()
print(np.bincount(I, D)) #[0.5 0.6]

如果你事先知道 A 的大小,你可以简单地使用 add.at:

import numpy as np

D = [[0.1, 0.1, 0.2], [0.2, 0.4, 0.1]]
I = [[0, 1, 0], [0, 1, 1]]

arr_D = np.array(D)
arr_I = np.array(I)

A = np.zeros(2)

np.add.at(A, arr_I, arr_D)

print(A)

输出

[0.5 0.6]

如果不知道A的大小,可以用max计算:

A = np.zeros(arr_I.max() + 1)
np.add.at(A, arr_I, arr_D)
print(A)

输出

[0.5 0.6]

这个算法的时间复杂度是O(N),还有space复杂度O(N)

那个:

arr_I.max() + 1

是 bincount 在幕后所做的,来自 documentation:

The result of binning the input array. The length of out is equal to np.amax(x)+1.

也就是说,bincount 至少快一个数量级:

I = np.random.choice(1000, size=(1000, 3), replace=True)
D = np.random.random((1000, 3))
%timeit make_A_with_at(I, D, 1000)
213 µs ± 25 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit make_A_with_bincount(I, D)
11 µs ± 15.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)