代码优化 - Python 中的函数调用次数

Code optimization - number of function calls in Python

我想知道如何解决这个问题以减少代码中 np.sum() 函数调用的开销。

我有一个 input 矩阵,比如 shape=(1000, 36)。每行代表图中的一个节点。我有一个正在执行的操作,它遍历每一行并对可变数量的其他行进行逐元素添加。这些 "other" 行在字典 nodes_nbrs 中定义,该字典为每一行记录了一个必须加在一起的行列表。一个例子是这样的:

nodes_nbrs = {0: [0, 1], 
              1: [1, 0, 2],
              2: [2, 1],
              ...}

在这里,节点 0 将被转换为节点 01 的总和。节点 1 将被转换为节点 102 的总和。其余节点依此类推。

我目前实施的当前(和幼稚)方式就是这样。我首先实例化一个我想要的最终形状的零数组,然后遍历 nodes_nbrs 字典中的每个键值对:

output = np.zeros(shape=input.shape)
for k, v in nodes_nbrs.items():
    output[k] = np.sum(input[v], axis=0)

此代码在小型测试 (shape=(1000, 36)) 中非常出色,但在大型测试 (shape=(~1E(5-6), 36)) 中,需要约 2-3 秒才能完成。我最终不得不执行此操作数千次,所以我想看看是否有更优化的方法来执行此操作。

在进行线路分析后,我注意到这里的关键杀手是一遍又一遍地调用 np.sum 函数,这大约占总时间的 50%。有没有办法可以消除这种开销?或者还有其他方法可以优化它吗?


除此之外,这里列出了我所做的事情,以及(非常简短地)他们的结果:

我还没有尝试过,但犹豫是否要这样做的事情:


对于那些好奇的人来说,这本质上是对图结构数据的卷积运算。为研究生院开发这个有点有趣,但在知识的前沿也有点令人沮丧。

如果 scipy.sparse 不是一个选项,您可能采用的一种方法是处理您的数据,以便您可以使用矢量化函数来完成编译层中的所有操作。如果将 neighbors 字典更改为带有缺失值适当标志的二维数组,则可以使用 np.take 提取所需数据,然后执行单个 sum() 调用。

这是我的想法的一个例子:

import numpy as np

def make_data(N=100):
    X = np.random.randint(1, 20, (N, 36))
    connections = np.random.randint(2, 5, N)
    nbrs = {i: list(np.random.choice(N, c))
            for i, c in enumerate(connections)}
    return X, nbrs

def original_solution(X, nbrs):
    output = np.zeros(shape=X.shape)
    for k, v in nbrs.items():
        output[k] = np.sum(X[v], axis=0)
    return output

def vectorized_solution(X, nbrs):
    # Make neighbors all the same length, filling with -1
    new_nbrs = np.full((X.shape[0], max(map(len, nbrs.values()))), -1, dtype=int)
    for i, v in nbrs.items():
        new_nbrs[i, :len(v)] = v

    # add a row of zeros to X
    new_X = np.vstack([X, 0 * X[0]])

    # compute the sums
    return new_X.take(new_nbrs, 0).sum(1)

现在我们可以确认结果匹配:

>>> X, nbrs = make_data(100)
>>> np.allclose(original_solution(X, nbrs),
                vectorized_solution(X, nbrs))
True

我们可以计时以查看加速:

X, nbrs = make_data(1000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
# 100 loops, best of 3: 13.7 ms per loop
# 100 loops, best of 3: 1.89 ms per loop

增加尺寸:

X, nbrs = make_data(100000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
1 loop, best of 3: 1.42 s per loop
1 loop, best of 3: 249 ms per loop

它大约快了 5-10 倍,这可能足以满足您的目的(尽管这在很大程度上取决于您的 nbrs 词典的确切特征)。


编辑: 只是为了好玩,我尝试了其他几种方法,一种使用 numpy.add.reduceat,一种使用 pandas.groupby,一种使用 [=23] =].看来我最初在上面提出的矢量化方法可能是最好的选择。供参考:

from itertools import chain

def reduceat_solution(X, nbrs):
    ind, j = np.transpose([[i, len(v)] for i, v in nbrs.items()])
    i = list(chain(*(nbrs[i] for i in ind)))
    j = np.concatenate([[0], np.cumsum(j)[:-1]])
    return np.add.reduceat(X[i], j)[ind]

np.allclose(original_solution(X, nbrs),
            reduceat_solution(X, nbrs))
# True

-

import pandas as pd

def groupby_solution(X, nbrs):
    i, j = np.transpose([[k, vi] for k, v in nbrs.items() for vi in v])
    return pd.groupby(pd.DataFrame(X[j]), i).sum().values

np.allclose(original_solution(X, nbrs),
            groupby_solution(X, nbrs))
# True

-

from scipy.sparse import csr_matrix
from itertools import chain

def sparse_solution(X, nbrs):
    items = (([i]*len(col), col, [1]*len(col)) for i, col in nbrs.items())
    rows, cols, data = (np.array(list(chain(*a))) for a in zip(*items))
    M = csr_matrix((data, (rows, cols)))
    return M.dot(X)

np.allclose(original_solution(X, nbrs),
            sparse_solution(X, nbrs))
# True

所有时间加在一起:

X, nbrs = make_data(100000)
%timeit original_solution(X, nbrs)
%timeit vectorized_solution(X, nbrs)
%timeit reduceat_solution(X, nbrs)
%timeit groupby_solution(X, nbrs)
%timeit sparse_solution(X, nbrs)
# 1 loop, best of 3: 1.46 s per loop
# 1 loop, best of 3: 268 ms per loop
# 1 loop, best of 3: 416 ms per loop
# 1 loop, best of 3: 657 ms per loop
# 1 loop, best of 3: 282 ms per loop

基于对最近稀疏问题的研究,例如

下面介绍了如何使用稀疏矩阵解决您的问题。该方法可能同样适用于密集的。这个想法是将稀疏 sum 实现为具有 1 的行(或列)的矩阵乘积。稀疏矩阵的索引很慢,但矩阵乘积是好的 C 代码。

在这种情况下,我将构建一个乘法矩阵,其中我想要求和的行为 1 - 字典中的每个条目都有不同的 1 组。

示例矩阵:

In [302]: A=np.arange(8*3).reshape(8,3)    
In [303]: M=sparse.csr_matrix(A)

选择词典:

In [304]: dict={0:[0,1],1:[1,0,2],2:[2,1],3:[3,4,7]}

根据该字典构建一个稀疏矩阵。这可能不是构造此类矩阵的最有效方法,但足以证明这个想法。

In [305]: r,c,d=[],[],[]
In [306]: for i,col in dict.items():
    c.extend(col)
    r.extend([i]*len(col))
    d.extend([1]*len(col))

In [307]: r,c,d
Out[307]: 
([0, 0, 1, 1, 1, 2, 2, 3, 3, 3],
 [0, 1, 1, 0, 2, 2, 1, 3, 4, 7],
 [1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

In [308]: idx=sparse.csr_matrix((d,(r,c)),shape=(len(dict),M.shape[0]))

执行求和并查看结果(作为密集数组):

In [310]: (idx*M).A
Out[310]: 
array([[ 3,  5,  7],
       [ 9, 12, 15],
       [ 9, 11, 13],
       [42, 45, 48]], dtype=int32)

这里是原版,用于对比。

In [312]: M.A
Out[312]: 
array([[ 0,  1,  2],
       [ 3,  4,  5],
       [ 6,  7,  8],
       [ 9, 10, 11],
       [12, 13, 14],
       [15, 16, 17],
       [18, 19, 20],
       [21, 22, 23]], dtype=int32)