如何根据另一个具有重复索引的数组获取 numpy 数组中的值总和

How to get sum of values in a numpy array based on another array with repetitive indices

data_values = np.random.rand(10)
data_ind = np.random.randint(0,10,10)
    
data_values = (array([0.81444589, 0.57734696, 0.54130794, 0.22339518, 0.916973  ,
            0.14956333, 0.74504583, 0.36218693, 0.17958372, 0.47195214]),
    
data_ind = array([7, 5, 2, 2, 0, 6, 6, 1, 4, 3]))

期望的输出:

0 - 0.91693   
1 - 0.36218693  
2 - 0.54130794 + 0.22339518  
3 - 0.47195214  
4 - 0.17958372  
5 - 0.57734696  
6 -  0.14956333 + 0.74504583  
output = array([0.916973, 0.36218694, 0.7647031, 0.47195214, 0.17958371, 0.577347, 0.89460915, 0.8144459], dtype=float32)

我写了很长的路

nodal_values = np.zeros(8, dtype=np.float32)  
for nodes in range(8):  
    nodal_values[nodes] = np.sum(data_values[np.where(data == nodes)[0]])

上述方法需要很多时间,而

a = ((np.mgrid[:M,:N] == b)[0] * c).sum(axis=1)

给出数百万大数据的内存错误。

我正在寻找优化的方式。

请检查 Whosebug question guidelines 以提出更好的问题,并正确格式化它们。


选项

原代码

这就是您要针对 N 的大值进行优化的内容(我冒昧地编辑了您的代码,使其没有硬编码值,并修正了一个拼写错误,data_values 而不是data):

data_values = np.random.rand(N) 
data_ind = np.random.randint(0, N, N)

xsize = data_ind.max() + 1
nodal_values = np.zeros(xsize, dtype=np.float32)  
for nodes in range(xsize):  
    nodal_values[nodes] = np.sum(data_values[np.where(data_ind == nodes)[0]])

稍微好一点的版本(为了可读性)

我创建了以下版本,它提高了可读性并取消了对 np.where 的使用:

idx = np.arange(xsize)[:, None] == data_ind
nodal_values = [np.sum(data_values[idx[i]]) for i in range(xsize)] # Python list

更好的版本

我在 中针对你的情况实施了@Divakar 接受的答案(一定要检查它以更好地理解它):

_, idx, _ = np.unique(data_ind, return_counts=True, return_inverse=True)
nodal_values = np.bincount(idx, data_values) # Same shape and type as your version

比较

使用您的原始值:

data_values = np.array([0.81444589, 0.57734696, 0.54130794, 0.22339518, 0.916973, 0.14956333, 0.74504583, 0.36218693, 0.17958372, 0.47195214])
data_ind = np.array([7, 5, 2, 2, 0, 6, 6, 1, 4, 3])

我使用 timeit 模块 (mean ± std. dev. of 7 runs, 10000000 loops each) 获得了以下性能:

Original code: 49.2 +- 11.1 ns
Much better version: 45.2 +- 4.98 ns
Slightly better version: 36.4 +- 2.81 ns

对于非常小的 N 值,即 1 到 10,没有显着差异。但是,对于大的,使用哪一个是没有问题的;两个带有 for 循环的版本都需要很长时间,而矢量化实现速度非常快。

测试代码

import numpy as np
import timeit
import matplotlib.pyplot as plt

def original_code():
    xsize = data_ind.max() + 1
    nodal_values = np.zeros(xsize, dtype=np.float32)
    for nodes in range(xsize):
        nodal_values[nodes] = np.sum(data_values[np.where(data_ind == nodes)[0]])

def much_better():
    _, idx, _ = np.unique(data_ind, return_counts=True, return_inverse=True)
    nodal_values = np.bincount(idx, data_values)

def slightly_better():
    xsize = data_ind.max() + 1
    idx = np.arange(xsize)[:, None] == data_ind
    nodal_values = [np.sum(data_values[idx[i]]) for i in range(xsize)]

sizes = [i*5 for i in range(1, 7)]
original_code_times = np.zeros((len(sizes),))
slightly_better_times = np.zeros((len(sizes),))
much_better_times = np.zeros((len(sizes),))
for i, N in enumerate(sizes):
    print(N)
    data_values = np.random.rand(N)
    data_ind = np.random.randint(0, N, N)

    # Divided by 100 repeats to get average
    original_code_times[i] = timeit.timeit(original_code, number=100) / 100
    much_better_times[i] = timeit.timeit(much_better, number=100) / 100
    slightly_better_times[i] = timeit.timeit(slightly_better, number=100) / 100

# Multiply by 1000 to get everything in ms
original_code_times *= 1000
slightly_better_times *= 1000
much_better_times *= 1000

# %%
plt.figure(dpi=120)
plt.title("Small N's")
plt.plot(sizes, original_code_times, label="Original code")
plt.plot(sizes, slightly_better_times, label="Slightly better")
plt.plot(sizes, much_better_times, label="Much better")
plt.ylabel("Time [ms]")
plt.xlabel("N")
plt.xticks(sizes)
plt.legend()
plt.savefig("small_N.png", dpi=120)
plt.show()
plt.close()

我希望这对可能偶然发现此问题的任何人有所帮助。