代码优化 - Python 中的函数调用次数
Code optimization - number of function calls in Python
我想知道如何解决这个问题以减少代码中 np.sum()
函数调用的开销。
我有一个 input
矩阵,比如 shape=(1000, 36)
。每行代表图中的一个节点。我有一个正在执行的操作,它遍历每一行并对可变数量的其他行进行逐元素添加。这些 "other" 行在字典 nodes_nbrs
中定义,该字典为每一行记录了一个必须加在一起的行列表。一个例子是这样的:
nodes_nbrs = {0: [0, 1],
1: [1, 0, 2],
2: [2, 1],
...}
在这里,节点 0
将被转换为节点 0
和 1
的总和。节点 1
将被转换为节点 1
、0
和 2
的总和。其余节点依此类推。
我目前实施的当前(和幼稚)方式就是这样。我首先实例化一个我想要的最终形状的零数组,然后遍历 nodes_nbrs
字典中的每个键值对:
output = np.zeros(shape=input.shape)
for k, v in nodes_nbrs.items():
output[k] = np.sum(input[v], axis=0)
此代码在小型测试 (shape=(1000, 36)
) 中非常出色,但在大型测试 (shape=(~1E(5-6), 36)
) 中,需要约 2-3 秒才能完成。我最终不得不执行此操作数千次,所以我想看看是否有更优化的方法来执行此操作。
在进行线路分析后,我注意到这里的关键杀手是一遍又一遍地调用 np.sum
函数,这大约占总时间的 50%。有没有办法可以消除这种开销?或者还有其他方法可以优化它吗?
除此之外,这里列出了我所做的事情,以及(非常简短地)他们的结果:
- A
cython
版本:消除了 for
循环类型检查开销,减少了 30% 的时间。对于 cython
版本,np.sum
大约占挂钟总时间的 80%,而不是 50%。
- 预先将
np.sum
声明为变量npsum
,然后在for
循环中调用npsum
。与原版无区别
- 将
np.sum
替换为np.add.reduce
,并将其赋值给变量npsum
,然后在for
循环中调用npsum
。挂钟时间减少约 10%,但随后与 autograd
不兼容(下面在稀疏矩阵要点中进行解释)。
numba
JIT-ing: 没有尝试添加装饰器。没有进步,但没有更努力。
- 将
nodes_nbrs
字典转换为密集的numpy
二进制数组(1s和0s),然后进行单个np.dot
操作。理论上好,实际上不好,因为它需要 shape=(10^n, 10^n)
的方阵,这在内存使用方面是二次的。
我还没有尝试过,但犹豫是否要这样做的事情:
scipy
稀疏矩阵:我使用的是autograd
,它不支持scipy
稀疏矩阵的dot
操作的自动微分。
对于那些好奇的人来说,这本质上是对图结构数据的卷积运算。为研究生院开发这个有点有趣,但在知识的前沿也有点令人沮丧。
如果 scipy.sparse 不是一个选项,您可能采用的一种方法是处理您的数据,以便您可以使用矢量化函数来完成编译层中的所有操作。如果将 neighbors 字典更改为带有缺失值适当标志的二维数组,则可以使用 np.take
提取所需数据,然后执行单个 sum()
调用。
这是我的想法的一个例子:
import numpy as np
def make_data(N=100):
X = np.random.randint(1, 20, (N, 36))
connections = np.random.randint(2, 5, N)
nbrs = {i: list(np.random.choice(N, c))
for i, c in enumerate(connections)}
return X, nbrs
def original_solution(X, nbrs):
output = np.zeros(shape=X.shape)
for k, v in nbrs.items():
output[k] = np.sum(X[v], axis=0)
return output
def vectorized_solution(X, nbrs):
# Make neighbors all the same length, filling with -1
new_nbrs = np.full((X.shape[0], max(map(len, nbrs.values()))), -1, dtype=int)
for i, v in nbrs.items():
new_nbrs[i, :len(v)] = v
# add a row of zeros to X
new_X = np.vstack([X, 0 * X[0]])
# compute the sums
return new_X.take(new_nbrs, 0).sum(1)
现在我们可以确认结果匹配:
>>> X, nbrs = make_data(100)
>>> np.allclose(original_solution(X, nbrs),
vectorized_solution(X, nbrs))
True
我们可以计时以查看加速:
X, nbrs = make_data(1000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
# 100 loops, best of 3: 13.7 ms per loop
# 100 loops, best of 3: 1.89 ms per loop
增加尺寸:
X, nbrs = make_data(100000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
1 loop, best of 3: 1.42 s per loop
1 loop, best of 3: 249 ms per loop
它大约快了 5-10 倍,这可能足以满足您的目的(尽管这在很大程度上取决于您的 nbrs
词典的确切特征)。
编辑: 只是为了好玩,我尝试了其他几种方法,一种使用 numpy.add.reduceat
,一种使用 pandas.groupby
,一种使用 [=23] =].看来我最初在上面提出的矢量化方法可能是最好的选择。供参考:
from itertools import chain
def reduceat_solution(X, nbrs):
ind, j = np.transpose([[i, len(v)] for i, v in nbrs.items()])
i = list(chain(*(nbrs[i] for i in ind)))
j = np.concatenate([[0], np.cumsum(j)[:-1]])
return np.add.reduceat(X[i], j)[ind]
np.allclose(original_solution(X, nbrs),
reduceat_solution(X, nbrs))
# True
-
import pandas as pd
def groupby_solution(X, nbrs):
i, j = np.transpose([[k, vi] for k, v in nbrs.items() for vi in v])
return pd.groupby(pd.DataFrame(X[j]), i).sum().values
np.allclose(original_solution(X, nbrs),
groupby_solution(X, nbrs))
# True
-
from scipy.sparse import csr_matrix
from itertools import chain
def sparse_solution(X, nbrs):
items = (([i]*len(col), col, [1]*len(col)) for i, col in nbrs.items())
rows, cols, data = (np.array(list(chain(*a))) for a in zip(*items))
M = csr_matrix((data, (rows, cols)))
return M.dot(X)
np.allclose(original_solution(X, nbrs),
sparse_solution(X, nbrs))
# True
所有时间加在一起:
X, nbrs = make_data(100000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
%timeit reduceat_solution(X, nbrs)
%timeit groupby_solution(X, nbrs)
%timeit sparse_solution(X, nbrs)
# 1 loop, best of 3: 1.46 s per loop
# 1 loop, best of 3: 268 ms per loop
# 1 loop, best of 3: 416 ms per loop
# 1 loop, best of 3: 657 ms per loop
# 1 loop, best of 3: 282 ms per loop
基于对最近稀疏问题的研究,例如
下面介绍了如何使用稀疏矩阵解决您的问题。该方法可能同样适用于密集的。这个想法是将稀疏 sum
实现为具有 1 的行(或列)的矩阵乘积。稀疏矩阵的索引很慢,但矩阵乘积是好的 C 代码。
在这种情况下,我将构建一个乘法矩阵,其中我想要求和的行为 1 - 字典中的每个条目都有不同的 1 组。
示例矩阵:
In [302]: A=np.arange(8*3).reshape(8,3)
In [303]: M=sparse.csr_matrix(A)
选择词典:
In [304]: dict={0:[0,1],1:[1,0,2],2:[2,1],3:[3,4,7]}
根据该字典构建一个稀疏矩阵。这可能不是构造此类矩阵的最有效方法,但足以证明这个想法。
In [305]: r,c,d=[],[],[]
In [306]: for i,col in dict.items():
c.extend(col)
r.extend([i]*len(col))
d.extend([1]*len(col))
In [307]: r,c,d
Out[307]:
([0, 0, 1, 1, 1, 2, 2, 3, 3, 3],
[0, 1, 1, 0, 2, 2, 1, 3, 4, 7],
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
In [308]: idx=sparse.csr_matrix((d,(r,c)),shape=(len(dict),M.shape[0]))
执行求和并查看结果(作为密集数组):
In [310]: (idx*M).A
Out[310]:
array([[ 3, 5, 7],
[ 9, 12, 15],
[ 9, 11, 13],
[42, 45, 48]], dtype=int32)
这里是原版,用于对比。
In [312]: M.A
Out[312]:
array([[ 0, 1, 2],
[ 3, 4, 5],
[ 6, 7, 8],
[ 9, 10, 11],
[12, 13, 14],
[15, 16, 17],
[18, 19, 20],
[21, 22, 23]], dtype=int32)
我想知道如何解决这个问题以减少代码中 np.sum()
函数调用的开销。
我有一个 input
矩阵,比如 shape=(1000, 36)
。每行代表图中的一个节点。我有一个正在执行的操作,它遍历每一行并对可变数量的其他行进行逐元素添加。这些 "other" 行在字典 nodes_nbrs
中定义,该字典为每一行记录了一个必须加在一起的行列表。一个例子是这样的:
nodes_nbrs = {0: [0, 1],
1: [1, 0, 2],
2: [2, 1],
...}
在这里,节点 0
将被转换为节点 0
和 1
的总和。节点 1
将被转换为节点 1
、0
和 2
的总和。其余节点依此类推。
我目前实施的当前(和幼稚)方式就是这样。我首先实例化一个我想要的最终形状的零数组,然后遍历 nodes_nbrs
字典中的每个键值对:
output = np.zeros(shape=input.shape)
for k, v in nodes_nbrs.items():
output[k] = np.sum(input[v], axis=0)
此代码在小型测试 (shape=(1000, 36)
) 中非常出色,但在大型测试 (shape=(~1E(5-6), 36)
) 中,需要约 2-3 秒才能完成。我最终不得不执行此操作数千次,所以我想看看是否有更优化的方法来执行此操作。
在进行线路分析后,我注意到这里的关键杀手是一遍又一遍地调用 np.sum
函数,这大约占总时间的 50%。有没有办法可以消除这种开销?或者还有其他方法可以优化它吗?
除此之外,这里列出了我所做的事情,以及(非常简短地)他们的结果:
- A
cython
版本:消除了for
循环类型检查开销,减少了 30% 的时间。对于cython
版本,np.sum
大约占挂钟总时间的 80%,而不是 50%。 - 预先将
np.sum
声明为变量npsum
,然后在for
循环中调用npsum
。与原版无区别 - 将
np.sum
替换为np.add.reduce
,并将其赋值给变量npsum
,然后在for
循环中调用npsum
。挂钟时间减少约 10%,但随后与autograd
不兼容(下面在稀疏矩阵要点中进行解释)。 numba
JIT-ing: 没有尝试添加装饰器。没有进步,但没有更努力。- 将
nodes_nbrs
字典转换为密集的numpy
二进制数组(1s和0s),然后进行单个np.dot
操作。理论上好,实际上不好,因为它需要shape=(10^n, 10^n)
的方阵,这在内存使用方面是二次的。
我还没有尝试过,但犹豫是否要这样做的事情:
scipy
稀疏矩阵:我使用的是autograd
,它不支持scipy
稀疏矩阵的dot
操作的自动微分。
对于那些好奇的人来说,这本质上是对图结构数据的卷积运算。为研究生院开发这个有点有趣,但在知识的前沿也有点令人沮丧。
如果 scipy.sparse 不是一个选项,您可能采用的一种方法是处理您的数据,以便您可以使用矢量化函数来完成编译层中的所有操作。如果将 neighbors 字典更改为带有缺失值适当标志的二维数组,则可以使用 np.take
提取所需数据,然后执行单个 sum()
调用。
这是我的想法的一个例子:
import numpy as np
def make_data(N=100):
X = np.random.randint(1, 20, (N, 36))
connections = np.random.randint(2, 5, N)
nbrs = {i: list(np.random.choice(N, c))
for i, c in enumerate(connections)}
return X, nbrs
def original_solution(X, nbrs):
output = np.zeros(shape=X.shape)
for k, v in nbrs.items():
output[k] = np.sum(X[v], axis=0)
return output
def vectorized_solution(X, nbrs):
# Make neighbors all the same length, filling with -1
new_nbrs = np.full((X.shape[0], max(map(len, nbrs.values()))), -1, dtype=int)
for i, v in nbrs.items():
new_nbrs[i, :len(v)] = v
# add a row of zeros to X
new_X = np.vstack([X, 0 * X[0]])
# compute the sums
return new_X.take(new_nbrs, 0).sum(1)
现在我们可以确认结果匹配:
>>> X, nbrs = make_data(100)
>>> np.allclose(original_solution(X, nbrs),
vectorized_solution(X, nbrs))
True
我们可以计时以查看加速:
X, nbrs = make_data(1000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
# 100 loops, best of 3: 13.7 ms per loop
# 100 loops, best of 3: 1.89 ms per loop
增加尺寸:
X, nbrs = make_data(100000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
1 loop, best of 3: 1.42 s per loop
1 loop, best of 3: 249 ms per loop
它大约快了 5-10 倍,这可能足以满足您的目的(尽管这在很大程度上取决于您的 nbrs
词典的确切特征)。
编辑: 只是为了好玩,我尝试了其他几种方法,一种使用 numpy.add.reduceat
,一种使用 pandas.groupby
,一种使用 [=23] =].看来我最初在上面提出的矢量化方法可能是最好的选择。供参考:
from itertools import chain
def reduceat_solution(X, nbrs):
ind, j = np.transpose([[i, len(v)] for i, v in nbrs.items()])
i = list(chain(*(nbrs[i] for i in ind)))
j = np.concatenate([[0], np.cumsum(j)[:-1]])
return np.add.reduceat(X[i], j)[ind]
np.allclose(original_solution(X, nbrs),
reduceat_solution(X, nbrs))
# True
-
import pandas as pd
def groupby_solution(X, nbrs):
i, j = np.transpose([[k, vi] for k, v in nbrs.items() for vi in v])
return pd.groupby(pd.DataFrame(X[j]), i).sum().values
np.allclose(original_solution(X, nbrs),
groupby_solution(X, nbrs))
# True
-
from scipy.sparse import csr_matrix
from itertools import chain
def sparse_solution(X, nbrs):
items = (([i]*len(col), col, [1]*len(col)) for i, col in nbrs.items())
rows, cols, data = (np.array(list(chain(*a))) for a in zip(*items))
M = csr_matrix((data, (rows, cols)))
return M.dot(X)
np.allclose(original_solution(X, nbrs),
sparse_solution(X, nbrs))
# True
所有时间加在一起:
X, nbrs = make_data(100000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
%timeit reduceat_solution(X, nbrs)
%timeit groupby_solution(X, nbrs)
%timeit sparse_solution(X, nbrs)
# 1 loop, best of 3: 1.46 s per loop
# 1 loop, best of 3: 268 ms per loop
# 1 loop, best of 3: 416 ms per loop
# 1 loop, best of 3: 657 ms per loop
# 1 loop, best of 3: 282 ms per loop
基于对最近稀疏问题的研究,例如
下面介绍了如何使用稀疏矩阵解决您的问题。该方法可能同样适用于密集的。这个想法是将稀疏 sum
实现为具有 1 的行(或列)的矩阵乘积。稀疏矩阵的索引很慢,但矩阵乘积是好的 C 代码。
在这种情况下,我将构建一个乘法矩阵,其中我想要求和的行为 1 - 字典中的每个条目都有不同的 1 组。
示例矩阵:
In [302]: A=np.arange(8*3).reshape(8,3)
In [303]: M=sparse.csr_matrix(A)
选择词典:
In [304]: dict={0:[0,1],1:[1,0,2],2:[2,1],3:[3,4,7]}
根据该字典构建一个稀疏矩阵。这可能不是构造此类矩阵的最有效方法,但足以证明这个想法。
In [305]: r,c,d=[],[],[]
In [306]: for i,col in dict.items():
c.extend(col)
r.extend([i]*len(col))
d.extend([1]*len(col))
In [307]: r,c,d
Out[307]:
([0, 0, 1, 1, 1, 2, 2, 3, 3, 3],
[0, 1, 1, 0, 2, 2, 1, 3, 4, 7],
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
In [308]: idx=sparse.csr_matrix((d,(r,c)),shape=(len(dict),M.shape[0]))
执行求和并查看结果(作为密集数组):
In [310]: (idx*M).A
Out[310]:
array([[ 3, 5, 7],
[ 9, 12, 15],
[ 9, 11, 13],
[42, 45, 48]], dtype=int32)
这里是原版,用于对比。
In [312]: M.A
Out[312]:
array([[ 0, 1, 2],
[ 3, 4, 5],
[ 6, 7, 8],
[ 9, 10, 11],
[12, 13, 14],
[15, 16, 17],
[18, 19, 20],
[21, 22, 23]], dtype=int32)