计算 N 个样本和聚类质心之间的平方欧氏距离的最有效方法是什么?

What is the most efficient way to compute the square euclidean distance between N samples and clusters centroids?

我正在寻找一种有效的方法(没有 for 循环)来计算一组样本和一组簇质心之间的欧氏距离。

示例:

import numpy as np
X = np.array([[1,2,3],[1, 1, 1],[0, 2, 0]])
y = np.array([[1,2,3], [0, 1, 0]])

预期输出:

array([[ 0., 11.],
       [ 5.,  2.],
       [10.,  1.]])

这是 X 中的每个样本与 y 中的每个质心之间的平方欧氏距离。

我想到了 2 个解决方案:

解决方案 1:

def dist_2(X,y):
    X_square_sum = np.sum(np.square(X), axis = 1)
    y_square_sum = np.sum(np.square(y), axis = 1)
    dot_xy = np.dot(X, y.T)
    X_square_sum_tile = np.tile(X_square_sum.reshape(-1, 1), (1, y.shape[0]))
    y_square_sum_tile = np.tile(y_square_sum.reshape(1, -1), (X.shape[0], 1))
    dist = X_square_sum_tile + y_square_sum_tile - (2 * dot_xy)
    return dist

dist = dist_2(X, y)

解决方案 2:

import scipy
dist = scipy.spatial.distance.cdist(X,y)**2

两个解决方案的性能(挂钟时间)

import time
X = np.random.random((100000, 50))
y = np.random.random((100, 50))

start = time.time()
dist = scipy.spatial.distance.cdist(X,y)**2
end = time.time()
print (end - start)

平均经过的挂钟时间 = 0.7 秒

start = time.time()
dist = dist_2(X,y)
end = time.time()
print (end - start)

平均经过的挂钟时间 = 0.3 秒

大量质心测试

X = np.random.random((100000, 50))
y = np.random.random((1000, 50))

挂钟平均耗时 "solution 1" = 50 秒(+ 内存问题)

挂钟平均耗时 "solution 2" = 6 秒!!!

结论

似乎“解决方案 1 在平均挂钟时间(在小数据集上)方面比 "solution 2" 更有效,但在内存方面效率低下。

有什么建议吗?

此问题经常与近邻搜索结合使用。如果是这种情况,请查看 kdtree approach。这将比计算欧几里德距离更有效,无论是内存消耗还是性能。

如果不是这种情况,这里有一些可能的方法。前两个来自。第三个使用 Numba 进行 jit 编译。与前两个版本的主要区别在于他避免了临时数组。

计算欧氏距离的三种方法

import numpy as np
import numba as nb

# @Paul Panzer
#
def outer_sum_dot_app(A,B):
    return np.add.outer((A*A).sum(axis=-1), (B*B).sum(axis=-1)) - 2*np.dot(A,B.T)

# @Divakar
#
def outer_einsum_dot_app(A,B):
    return np.einsum('ij,ij->i',A,A)[:,None] + np.einsum('ij,ij->i',B,B) - 2*np.dot(A,B.T)

@nb.njit()
def calc_dist(A,B,sqrt=False):
  dist=np.dot(A,B.T)

  TMP_A=np.empty(A.shape[0],dtype=A.dtype)
  for i in range(A.shape[0]):
    sum=0.
    for j in range(A.shape[1]):
      sum+=A[i,j]**2
    TMP_A[i]=sum

  TMP_B=np.empty(B.shape[0],dtype=A.dtype)
  for i in range(B.shape[0]):
    sum=0.
    for j in range(B.shape[1]):
      sum+=B[i,j]**2
    TMP_B[i]=sum

  if sqrt==True:
    for i in range(A.shape[0]):
      for j in range(B.shape[0]):
        dist[i,j]=np.sqrt(-2.*dist[i,j]+TMP_A[i]+TMP_B[j])
  else:
    for i in range(A.shape[0]):
      for j in range(B.shape[0]):
        dist[i,j]=-2.*dist[i,j]+TMP_A[i]+TMP_B[j]
  return dist

计时

A = np.random.randn(10000,3)
B = np.random.randn(10000,3)

#calc_dist:                      360ms first call excluded due to compilation overhead
#outer_einsum_dot_app (Divakar): 1150ms
#outer_sum_dot_app (Paul Panzer):1590ms
#dist_2:                         1840ms

A = np.random.randn(1000,100)
B = np.random.randn(1000,100)

#calc_dist:                      4.3  ms first call excluded due to compilation overhead
#outer_einsum_dot_app (Divakar): 13.12ms
#outer_sum_dot_app (Paul Panzer):13.2 ms
#dist_2:                         21.3ms