在没有 for 循环的情况下对 Numpy 数组进行连续一对一计算的快速方法?

Fast way to do consecutive one-to-all calculations on Numpy arrays without a for-loop?

我正在研究一个优化问题,但为了避免陷入细节,我将提供一个简单的错误示例,这个错误已经让我头疼了几天。

假设我有一个带有观察到的 x-y 坐标的 2D numpy 数组:

from scipy.optimize import distance
x = np.array([1,2], [2,3], [4,5], [5,6])

我还有一个 x-y 坐标列表来与这些点 (y) 进行比较:

y = np.array([11,13], [12, 14])

我有一个函数,它计算 x 的值与 y 中的所有值之间的曼哈顿差异之和:

def find_sum(ref_row, comp_rows):
    modeled_counts = []
    y = ref_row * len(comp_rows)
    res = list(map(distance.cityblock, ref_row, comp_rows))
    modeled_counts.append(sum(res))

    return sum(modeled_counts)

本质上,我想做的是找到 y 中每个项目与 x 中每个项目的曼哈顿距离之和(所以基本上对于 x 中的每个项目,找到总和该 (x,y) 对与 y 中的每个 (x,y) 对之间的曼哈顿距离。

我已经使用以下代码行对此进行了尝试:

z = list(map(find_sum, x, y))

然而,z 的长度为 2(如 y),而不是如 x 的 4。有没有办法保证z是连续的one-to-all计算的结果呢?也就是说,我想计算 x[0]y 中每个集合之间的所有曼哈顿差异之和,依此类推,因此 z 的长度应该等于 x.

的长度

有没有不用 for 循环的简单方法?我的数据相当大(约 400 万行),所以我非常感谢快速解决方案。我对 Python 编程还很陌生,所以任何关于为什么该解决方案有效且快速的解释也将不胜感激,但绝对不是必需的!

谢谢!

此解决方案实现了 numpy 中的距离,因为我认为它是 broadcasting 的一个很好的示例,如果您需要使用数组和矩阵,了解这一点非常有用。

根据曼哈顿距离的定义,您需要评估每列之间的差异的绝对值之和。但是,xx[:, 0] 的第一列形状为 (4,),yy[:, 0] 的第一列形状为 (2,),因此它们在应用减法的意义上不兼容:广播 属性 表示每个形状都从尾随维度开始进行比较,并且当两个维度相等或其中之一为 1 时,两个维度是兼容的。可悲的是,none 其中对您的专栏是正确的。

但是,您可以使用 np.newaxis 添加一个值为 1 的新维度,因此

x[:, 0]

array([1, 2, 4, 5]),但是

x[:, 0, np.newaxis]

array([[1],
       [2],
       [4],
       [5]])

它的形状是(4 ,1)。现在,形状为 (4, 1) 的矩阵减去形状为 2 的数组得到形状为 (4, 2) 的矩阵,通过 numpy 的广播处理:

   4 x 1
       2
=  4 x 2

您可以获得每列的差异:

first_column_difference = x[:, 0, np.newaxis] - y[:, 0]
second_column_difference = x[:, 1, np.newaxis] - y[:, 1]

并计算它们的绝对值之和:

np.abs(first_column_difference) + np.abs(second_column_difference)

这导致 (4, 2) 矩阵。现在,您想要对每一行的值求和,以便您有 4 个值:

np.sum(np.abs(first_column_difference) + np.abs(second_column_difference), axis=1)

结果是 array([73, 69, 61, 57])。规则很简单:参数 axis 将从结果中消除该维度,因此对 (4, 2) 矩阵使用 axis=1 会生成 4 个值——如果使用 axis=0,它将生成 2 个值。

那么,这将解决您的问题:

x = np.array([[1, 2], [2, 3], [4, 5], [5, 6]])
y = np.array([[11, 13], [12, 43]])

first_column_difference = x[:, 0, np.newaxis] - y[:, 0]
second_column_difference = x[:, 1, np.newaxis] - y[:, 1]
z = np.abs(first_column_difference) + np.abs(second_column_difference)
print(np.sum(z, axis=1))

您也可以跳过每一列的中间步骤并立即评估所有内容(这有点难以理解,所以我更喜欢上面描述的方法来解释发生了什么):

print(np.abs(x[:, np.newaxis] - y).sum(axis=(1, 2)))

这是n维曼哈顿距离的一般情况:如果x是(u, n)并且y是(v, n),它通过广播生成u行(u, 1, n) 通过 (v, n) = (u, v, n),然后应用 sum 消除第二个和第三个轴。

这里是使用 numpy 广播的简单解释的方法

调整广播形状

import numpy as np

start_points = np.array([[1,2], [2,3], [4,5], [5,6]])
dest_points = np.array([[11,13], [12, 14]])

## using np.newaxis as index add a new dimension at that position
## : give all the elements on that dimension
start_points = start_points[np.newaxis, :, :]
dest_points = dest_points[:, np.newaxis, :]

## Now lets check he shape of the point arrays
print('start_points.shape: ', start_points.shape) # (1, 4, 2)
print('dest_points.shape', dest_points.shape) # (2, 1, 2)

让我们试着理解

  • 形状的最后一个元素表示一个点的 x 和 y,大小为 2
  • 我们可以认为start_points有1行4列的点
  • 我们可以认为dest_points有2行1列的点

我们可以将 start_pointsdest_points 视为矩阵或大小为 (1X4) 和 (2X1) 的点的 table 我们清楚地看到尺寸不兼容。如果我们进行算术运算会发生什么 他们之间的操作?这是 numpy 的一个智能部分,称为广播。

  • 它将重复 start_points 的行以匹配 dest_point 的行,从而制作 (2X4)
  • 的矩阵
  • 它将重复 dest_point 的列以匹配 start_points 的列,从而制作 (2X4)
  • 的矩阵
  • 结果是start_points和dest_points上的每对元素之间的算术运算

计算距离

diff_x_y = start_points - dest_points
print(diff_x_y.shape) # (2, 4, 2)
abs_diff_x_y = np.abs(start_points - dest_points)
man_distance = np.sum(abs_diff_x_y, axis=2)
print('man_distance:\n', man_distance)
sum_distance = np.sum(man_distance, axis=0)
print('sum_distance:\n', sum_distance)

单线

start_points = np.array([[1,2], [2,3], [4,5], [5,6]])
dest_points = np.array([[11,13], [12, 14]])
np.sum(np.abs(start_points[np.newaxis, :, :] - dest_points[:, np.newaxis, :]), axis=(0,2))

这里有更多的细节explanation of broadcasting如果你想更了解它

对于如此多的行,您可以通过使用智能算法来节省大量资金。为简单起见,让我们假设只有一个维度;一旦我们建立了算法,回到一般情况就是对坐标求和的简单问题。

朴素算法是 O(mn),其中 m,n 是集合 X,Y 的大小。我们的算法是 O((m+n)log(m+n)),因此它的扩展性更好。

我们首先必须按坐标对XY的并集进行排序,然后在Y上形成cumsum。接下来,我们为每个 x in X 找到其左侧 y in Y 的编号 YbefX,并使用它来查找相应的 cumsum 项目 YbefXvalx左边所有y的距离之和是YbefX乘以x的坐标减去YbefXval,所有y到右边是所有 y 坐标的总和减去 YbefXval 减去 n - YbefX 乘以 x.

的坐标

储蓄从何而来?排序坐标使我们能够循环使用之前所做的求和,而不是每次都从头开始。这利用了这样一个事实,即直到一个符号,我们总是对相同的 y 坐标求和,并且从左到右,符号一个一个地翻转。

代码:

import numpy as np
from scipy.spatial.distance import cdist
from timeit import timeit

def pp(X,Y):
    (m,k),(n,k) = X.shape,Y.shape
    XY = np.concatenate([X.T,Y.T],1)
    idx = XY.argsort(1)
    Xmsk = idx<m
    Ymsk = ~Xmsk
    Xidx = np.arange(k)[:,None],idx[Xmsk].reshape(k,m)
    Yidx = np.arange(k)[:,None],idx[Ymsk].reshape(k,n)
    YbefX = Ymsk.cumsum(1)[Xmsk].reshape(k,m)
    YbefXval = XY[Yidx].cumsum(1)[np.arange(k)[:,None],YbefX-1]
    YbefXval[YbefX==0] = 0
    XY[Xidx] = ((2*YbefX-n)*XY[Xidx]) - 2*YbefXval + Y.sum(0)[:,None]
    return XY[:,:m].sum(0)

def summed_cdist(X,Y):
    return cdist(X,Y,"minkowski",p=1).sum(1)

# demo    
m,n,k = 1000,500,10
X,Y = np.random.randn(m,k),np.random.randn(n,k)
print("same result:",np.allclose(pp(X,Y),summed_cdist(X,Y)))
print("sort       :",timeit(lambda:pp(X,Y),number=1000),"ms")
print("scipy cdist:",timeit(lambda:summed_cdist(X,Y),number=100)*10,"ms")

示例 运行,将智能算法 "sort" 与使用 cdist 库函数实现的朴素算法进行比较:

same result: True
sort       : 1.4447695480193943 ms
scipy cdist: 36.41934019047767 ms