向量化 NumPy 数组中的迭代加法
Vectorize iterative addition in NumPy arrays
对于二维索引的随机数组中的每个元素(可能有重复项),我想“+=1”到二维零数组中的相应网格。但是,我不知道如何优化计算。使用标准的 for 循环,如此处所示,
def interadd():
U = 100
input = np.random.random(size=(5000,2)) * U
idx = np.floor(input).astype(np.int)
grids = np.zeros((U,U))
for i in range(len(input)):
grids[idx[i,0],idx[i,1]] += 1
return grids
运行时间可能非常重要:
>> timeit(interadd, number=5000)
43.69953393936157
有没有办法向量化这个迭代过程?
您可以使用 np.add.at
稍微加快它的速度,它可以正确处理重复索引的情况:
def interadd(U, idx):
grids = np.zeros((U,U))
for i in range(len(idx)):
grids[idx[i,0],idx[i,1]] += 1
return grids
def interadd2(U, idx):
grids = np.zeros((U,U))
np.add.at(grids, idx.T.tolist(), 1)
return grids
def interadd3(U, idx):
# YXD suggestion
grids = np.zeros((U,U))
np.add.at(grids, (idx[:,0], idx[:,1]), 1)
return grids
这给出了
>>> U = 100
>>> idx = np.floor(np.random.random(size=(5000,2))*U).astype(np.int)
>>> (interadd(U, idx) == interadd2(U, idx)).all()
True
>>> %timeit interadd(U, idx)
100 loops, best of 3: 8.48 ms per loop
>>> %timeit interadd2(U, idx)
100 loops, best of 3: 2.62 ms per loop
以及YXD的建议:
>>> (interadd(U, idx) == interadd3(U, idx)).all()
True
>>> %timeit interadd3(U, idx)
1000 loops, best of 3: 1.09 ms per loop
您可以将 R,C
索引从 idx
转换为线性索引,然后找出唯一的索引及其计数,最后将它们存储在输出 grids
中作为最终输出。这是实现相同目标的实现 -
# Calculate linear indices corressponding to idx
lin_idx = idx[:,0]*U + idx[:,1]
# Get unique linear indices and their counts
unq_lin_idx,idx_counts = np.unique(lin_idx,return_counts=True)
# Setup output array and store index counts in raveled/flattened version
grids = np.zeros((U,U))
grids.ravel()[unq_lin_idx] = idx_counts
运行时测试 -
以下是涵盖所有方法(包括 )并使用与该解决方案中列出的相同定义的运行时测试 -
In [63]: U = 100
...: idx = np.floor(np.random.random(size=(5000,2))*U).astype(np.int)
...:
In [64]: %timeit interadd(U, idx)
100 loops, best of 3: 7.57 ms per loop
In [65]: %timeit interadd2(U, idx)
100 loops, best of 3: 2.59 ms per loop
In [66]: %timeit interadd3(U, idx)
1000 loops, best of 3: 1.24 ms per loop
In [67]: def unique_counts(U, idx):
...: lin_idx = idx[:,0]*U + idx[:,1]
...: unq_lin_idx,idx_counts = np.unique(lin_idx,return_counts=True)
...: grids = np.zeros((U,U))
...: grids.ravel()[unq_lin_idx] = idx_counts
...: return grids
...:
In [68]: %timeit unique_counts(U, idx)
1000 loops, best of 3: 595 µs per loop
运行时表明所提议的基于 np.unique
的方法比第二快的方法快 50% 以上。
Divakar 的回答让我尝试了以下方法,这看起来是目前最快的方法:
lin_idx = idx[:,0]*U + idx[:,1]
grids = np.bincount(lin_idx, minlength=U**2).reshape(U, U)
时间安排:
In [184]: U = 100
...: input = np.random.random(size=(5000,2)) * U
...: idx = np.floor(input).astype(np.int)
In [185]: %timeit interadd3(U, idx) # By DSM / XYD
1000 loops, best of 3: 1.68 ms per loop
In [186]: %timeit unique_counts(U, idx) # By Divakar
1000 loops, best of 3: 676 µs per loop
In [187]: %%timeit
...: lin_idx = idx[:,0]*U + idx[:,1]
...: grids = np.bincount(lin_idx, minlength=U*U).reshape(U, U)
...:
10000 loops, best of 3: 97.5 µs per loop
对于二维索引的随机数组中的每个元素(可能有重复项),我想“+=1”到二维零数组中的相应网格。但是,我不知道如何优化计算。使用标准的 for 循环,如此处所示,
def interadd():
U = 100
input = np.random.random(size=(5000,2)) * U
idx = np.floor(input).astype(np.int)
grids = np.zeros((U,U))
for i in range(len(input)):
grids[idx[i,0],idx[i,1]] += 1
return grids
运行时间可能非常重要:
>> timeit(interadd, number=5000)
43.69953393936157
有没有办法向量化这个迭代过程?
您可以使用 np.add.at
稍微加快它的速度,它可以正确处理重复索引的情况:
def interadd(U, idx):
grids = np.zeros((U,U))
for i in range(len(idx)):
grids[idx[i,0],idx[i,1]] += 1
return grids
def interadd2(U, idx):
grids = np.zeros((U,U))
np.add.at(grids, idx.T.tolist(), 1)
return grids
def interadd3(U, idx):
# YXD suggestion
grids = np.zeros((U,U))
np.add.at(grids, (idx[:,0], idx[:,1]), 1)
return grids
这给出了
>>> U = 100
>>> idx = np.floor(np.random.random(size=(5000,2))*U).astype(np.int)
>>> (interadd(U, idx) == interadd2(U, idx)).all()
True
>>> %timeit interadd(U, idx)
100 loops, best of 3: 8.48 ms per loop
>>> %timeit interadd2(U, idx)
100 loops, best of 3: 2.62 ms per loop
以及YXD的建议:
>>> (interadd(U, idx) == interadd3(U, idx)).all()
True
>>> %timeit interadd3(U, idx)
1000 loops, best of 3: 1.09 ms per loop
您可以将 R,C
索引从 idx
转换为线性索引,然后找出唯一的索引及其计数,最后将它们存储在输出 grids
中作为最终输出。这是实现相同目标的实现 -
# Calculate linear indices corressponding to idx
lin_idx = idx[:,0]*U + idx[:,1]
# Get unique linear indices and their counts
unq_lin_idx,idx_counts = np.unique(lin_idx,return_counts=True)
# Setup output array and store index counts in raveled/flattened version
grids = np.zeros((U,U))
grids.ravel()[unq_lin_idx] = idx_counts
运行时测试 -
以下是涵盖所有方法(包括
In [63]: U = 100
...: idx = np.floor(np.random.random(size=(5000,2))*U).astype(np.int)
...:
In [64]: %timeit interadd(U, idx)
100 loops, best of 3: 7.57 ms per loop
In [65]: %timeit interadd2(U, idx)
100 loops, best of 3: 2.59 ms per loop
In [66]: %timeit interadd3(U, idx)
1000 loops, best of 3: 1.24 ms per loop
In [67]: def unique_counts(U, idx):
...: lin_idx = idx[:,0]*U + idx[:,1]
...: unq_lin_idx,idx_counts = np.unique(lin_idx,return_counts=True)
...: grids = np.zeros((U,U))
...: grids.ravel()[unq_lin_idx] = idx_counts
...: return grids
...:
In [68]: %timeit unique_counts(U, idx)
1000 loops, best of 3: 595 µs per loop
运行时表明所提议的基于 np.unique
的方法比第二快的方法快 50% 以上。
Divakar 的回答让我尝试了以下方法,这看起来是目前最快的方法:
lin_idx = idx[:,0]*U + idx[:,1]
grids = np.bincount(lin_idx, minlength=U**2).reshape(U, U)
时间安排:
In [184]: U = 100
...: input = np.random.random(size=(5000,2)) * U
...: idx = np.floor(input).astype(np.int)
In [185]: %timeit interadd3(U, idx) # By DSM / XYD
1000 loops, best of 3: 1.68 ms per loop
In [186]: %timeit unique_counts(U, idx) # By Divakar
1000 loops, best of 3: 676 µs per loop
In [187]: %%timeit
...: lin_idx = idx[:,0]*U + idx[:,1]
...: grids = np.bincount(lin_idx, minlength=U*U).reshape(U, U)
...:
10000 loops, best of 3: 97.5 µs per loop