点的成对距离,由关联稀疏矩阵指定的对

Pair-wise distance of points, with pair specified by incidence sparse matrix

在我的代码中,我有一个点位置数组和一个稀疏关联矩阵。对于矩阵的每个非零 ij 元素,我想计算 i 点和 j 点之间的距离。

目前我正在使用 'nonzero()' 提取索引,但是此方法对输出索引进行排序,并且此排序占用了我整个应用程序的大部分执行时间。

import numpy as np
import scipy as sc
import scipy.sparse

#number of points
n = 100000 


#point positions
pos = np.random.rand(n,3)

#building incidence matrix, defining the pair wise calculation
density = 1e-3
n_entries = int(n**2*density)
indx = np.random.randint(0,n, n_entries)
indy = np.random.randint(0,n, n_entries)

Pairs = scipy.sparse.csc_matrix((np.ones(n_entries) ,(indx,indy)) , (n,n))



#current method
ii,jj = Pairs.nonzero() #this part is slow

d = np.linalg.norm(pos[ii] - pos[jj], axis = 1)

distance_matrix = scipy.sparse.csr_matrix((d , (ii,jj)) , (n,n))

修改后的答案

主要问题是 indx, indy 中有时会出现重复。因此,我们不能简单地做:

d2 = np.linalg.norm(pos[indx] - pos[indy], axis=1)
distance_matrix2 = scipy.sparse.csr_matrix((d2, (indx, indy)), (n, n))

...因为重复索引处的值将乘以重复项的数量。

下面的原答案是先去重索引。但即使使用更快的 pd_unique2_idx,整个操作最终还是比原来花费更多的时间。

相反,由于您已经计算了 Pairs(如果您注意到,它也存在重复索引问题:Pairs.max() 通常给出超过 1.0),让我们尝试获取非- 没有任何排序的零值,只使用结构中的内容。这是一种方法,遵循 indptrindices in the docs:

的定义
ii = Pairs.indices
jj = np.repeat(np.arange(len(Pairs.indptr) - 1), np.diff(Pairs.indptr))

您得到的索引与您自己的索引相同 ii, jj,但排序不同(列优先而不是行优先)。

时机

只看得到ii, jj的部分:

%timeit Pairs.nonzero()
# 1.27 s ± 5.31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit np.repeat(np.arange(len(Pairs.indptr) - 1), np.diff(Pairs.indptr))
# 41.3 ms ± 49.7 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

在上下文中:

def orig(pos, Pairs, n):
    #current method
    ii, jj = Pairs.nonzero() #this part is slow
    d = np.linalg.norm(pos[ii] - pos[jj], axis=1)
    distance_matrix = scipy.sparse.csr_matrix((d, (ii, jj)), (n, n))
    return distance_matrix

def modified(pos, Pairs, n):
    # only change: these two lines:
    ii = Pairs.indices
    jj = np.repeat(np.arange(len(Pairs.indptr) - 1), np.diff(Pairs.indptr))
    # unchanged
    d = np.linalg.norm(pos[ii] - pos[jj], axis=1)
    distance_matrix = scipy.sparse.csr_matrix((d, (ii, jj)), (n, n))
    return distance_matrix

关于您的数据(从 np.random.seed(0) 开始,以获得可重复的结果):

%timeit orig(pos, Pairs, n)
# 2.16 s ± 15.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit modified(pos, Pairs, n)
# 1.11 s ± 4.17 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

正确性

dm = orig(pos, Pairs, n)
dmod = modified(pos, Pairs, n)

np.all(np.c_[dm.nonzero()] == np.c_[dmod.nonzero()])
# True

i, j = dm.nonzero()
np.all(dm[i, j] == dmod[i, j])
# True

原回答

您可以按 indx, indy 顺序计算另一个距离列表,然后制作一个稀疏矩阵:

d2 = np.linalg.norm(pos[indx] - pos[indy], axis=1)
distance_matrix2 = scipy.sparse.csr_matrix((d2, (indx, indy)), (n, n))

然而,与您自己的距离矩阵的比较显示了一些罕见但可能的差异:它们来自您在 indx, indy 中重复位置的情况,相应的 Pairs.toarray()[indx, indy] 是重复次数。这源于 csccsr 稀疏矩阵的一个众所周知的“特征”,即对重复元素求和。例如。 or .

例如:

>>> indx = np.array([0,1,0])
>>> indy = np.array([2,0,2])
>>> scipy.sparse.csc_matrix((np.ones(3), (indx, indy))).toarray()
array([[0., 0., 2.],
       [1., 0., 0.]])

不幸的是,@hpaulj 在上面链接的帖子中使用 todok() 提出的解决方案不再适用于最新版本的 scipy。

使用 np.unique() 很慢,因为它对值进行排序。 pd.unique() 更快(无排序),但只接受一维数组。这是一种快速执行 2D 唯一性的方法。请注意,我们需要唯一行的索引,以便我们可以索引 indxindy,而且还需要距离矩阵(如果已经计算的话)。

def pd_unique2_idx(indx, indy):
    df = pd.DataFrame(np.c_[indx, indy])
    return np.nonzero(~df.duplicated().values)


# by contrast (slower on long sequences, because it sorts)
def np_unique2_idx(indx, indy):
    rc = np.vstack([indx, indy]).T.copy()
    dt = rc.dtype.descr * 2
    i = np.unique(rc.view(dt), return_index=True)[1]
    return i

indxindy 的 1M 元素上,我看到:

  • pd_unique2_idx:每个循环 52.1 毫秒 ± 529 微秒
  • np_unique2_idx:每个循环 972 毫秒 ± 19.5 毫秒

这个怎么用?按照您的设置:

i = pd_unique2_idx(indx, indy)
indx = indx[i]
indy = indy[i]
d2 = np.linalg.norm(pos[indx] - pos[indy], axis=1)
distance_matrix2 = scipy.sparse.csr_matrix((d2, (indx, indy)), (n, n))

使用您的示例数组:

In [104]: Pairs = sparse.csc_matrix((np.ones(n_entries) ,(indx,indy)) , (n,n))

创建 csc 本身的时间并不简单:

In [105]: timeit Pairs = sparse.csc_matrix((np.ones(n_entries) ,(indx,indy)) , (
     ...: n,n))
1.08 s ± 44.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

nonzero 将矩阵转换为 coo,然后对 0 进行额外检查:

def nonzero(self):
    A = self.tocoo()
    nz_mask = A.data != 0
    return (A.row[nz_mask], A.col[nz_mask])

coo转换比较快。我认为不涉及任何排序。

In [106]: timeit Pairs.tocoo()
221 ms ± 17 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

额外的掩蔽确实会增加时间。

In [107]: timeit Pairs.nonzero()
1.86 s ± 37.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

由于新制作的 csc 不会有任何明确的 0,您可以这样做:

M = Pairs.tocoo()
ii,jj = M.row, M.col

直接创建 coo 矩阵更快:

In [108]: timeit sparse.coo_matrix((np.ones(n_entries) ,(indx,indy)) , (n,n))
158 ms ± 15.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

这会将 rowcol 直接设置为 indxindy(或最多更改 dtype)。它是 csc 对索引进行排序的创建(按行和列的词法),并进行任何重复的求和。