找出两组点坐标之间的所有最短欧式距离

Find all shortest Euclidean distances between two groups of point coordinates

我有一个 Pandas DataFrame,其中列 X1, Y1 具有第一组坐标和列 X2, Y2[ 的点坐标=30=]有点坐标为第二组坐标。两组相互独立。恰好它们在同一个数据框中。示例:

X1,Y1,X2,Y2
41246.438,0.49,38791.673,0.49
41304.5,0.491,38921.557,0.491
41392.062,0.492,39037.135,0.492
41515.5,0.493,39199.972,0.493
41636.062,0.494,39346.561,0.494
41795.188,0.495,39477.63,0.495
42027.75,0.496,39576.275,0.496
42252.25,0.497,39732.102,0.497
42486.812,0.498,39833.753,0.498
42739.062,0.499,39949.13,0.499
43012.125,0.5,40135.42,0.5
43472.75,0.5,40292.017,0.5
43909.562,0.501,40479.452,0.501
44312.625,0.502,40725.329,0.502
44799.938,0.503,40950.05,0.503
45294.938,0.504,41214.136,0.504
45729.625,0.505,41514.213,0.505
45942.438,0.506,41943.208,0.506
46067.688,0.507,42296.643,0.507
46215,0.508,42653.477,0.508
46336.75,0.509,43138.834,0.509
46476.562,0.51,43557.815,0.51
46584.25,0.511,43966.564,0.511
46654.75,0.512,44166.996,0.512
46707.75,0.513,44310.557,0.513
46774.188,0.514,44410.069,0.514
46832.062,0.515,44518.045,0.515
46905.062,0.516,44608.646,0.516
46976.562,0.517,44678.073,0.517
47077.938,0.518,44727.393,0.518
47215.688,0.519,44786.498,0.519
47290.625,0.52,44845.867,0.52
47351.5,0.521,44915.072,0.521

对于 X1, Y1 列中的每个点,我需要在 X2, Y2 列中找到一个点,使得之间的欧氏距离这两点是最短的。

因此,我需要将 X2, Y2 列中找到的点与 X1, Y1[=30 中的对应点放在同一行中=].我还需要在另一列 D 中将计算出的最短欧几里得距离增加到同一行。然后对 X1, Y1.

列中的每个点重复此过程

一种方法是迭代列 X1、Y1 中的行,并为每一行在列 X2、Y2[= 中找到最短欧氏距离30=]。可能有更好的方法可以不用写 for 循环。

解决方案


使用Faiss.

pip install faiss

您可以使用速度稍快的 IndexIVFFlat 而不是 IndexFlatL2,这样您就可以得到近似结果。

import faiss
def get_closest(df: pd.DataFrame)->pd.DataFrame:
    d = 2 #  dimensionality

    xb = np.float32(df[["X2","Y2"]].values)
    xb = np.ascontiguousarray(xb)
    
    xq = np.float32(df[["X1","Y1"]].values)
    xq = np.ascontiguousarray(xq)

    index = faiss.IndexFlatL2(d) #  build the index
    index.add(xb)                #  add vectors to the index
    
    D, I = index.search(xq, 1)     # actual search
    
    res_df = df[["X1","Y1"]]
    res_df[["X2","Y2"]] = df[["X2","Y2"]].iloc[I[:,0]].reset_index(drop = True)
    res_df["distance"] = D[:,0]
    return res_df

get_closest(df)

性能


对于两组中的 1e4 (x,y) 对 - 运行 时间:

371 ms ± 58.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

对于 1e5 个向量

33.9 s ± 3.55 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

这应该类似于使用 scipy 或 NumPy 生成全距离矩阵,但它在内存使用方面效率更高,并且不需要对该矩阵进行进一步搜索。

通知


  1. 在上面的函数中 - 对于 res_df 我将其设置为 df 的一部分,这是不推荐的,因为您在 res_df 中所做的更改会影响 df.这是为了降低内存使用量,如果你想避免不可预知的行为,你可以制作一个副本。
  2. 如果每个点都需要超过 1 个邻居 - 使用 faiss 只需最少的修改即可轻松实现。

备选方案


使用 KDTree

import pandas as pd
from scipy.spatial import KDTree
def get_closest(df: pd.DataFrame)->pd.DataFrame:
    tree = KDTree(df[["X1", "Y1"]].values) 
    dist, ind = tree.query(df[["X2", "Y2"]].values, k=1) # k desired number of neighbors 
    res_df = df[["X1","Y1"]]
    res_df[["X2","Y2"]] = df[["X2","Y2"]].iloc[ind].reset_index(drop = True)
    res_df["distance"] = dist
    return res_df
get_closest(df)

对于两组中的 1e4 (x,y) 对 - 运行 时间:

1.43 s ± 55 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

对于两组中的 1e5 (x,y) 对 - 运行 时间:

17 s ± 767 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

使用 cdist,@Dimon

提议
df[['X2','Y2']] = \
  df[['X2','Y2']].iloc[np.argmin(cdist(df[['X1','Y1']], df[['X2','Y2']],
  metric='euclidean' ), axis=1),:].copy().reset_index(drop=True)
df['D'] = np.linalg.norm(df[['X1','Y1']].values - df[['X2','Y2']].values, axis=1)

对于两组中的 1e4 (x,y) 对 - 运行 时间:

543 ms ± 112 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

对于两组中的 1e5 (x,y) 对 - 运行 时间:

MemoryError: Unable to allocate 74.5 GiB for an array with shape (100000, 100000) and data type float64

使用numpy,由@Valdi_Bo

提议
diffs = df.iloc[:, 2:].values[np.newaxis, :, :]\
    - df.iloc[:, :2].values[:, np.newaxis, :]
diffs2 = (diffs ** 2).sum(axis=2)
result = pd.Series(np.sqrt(diffs2.min(axis=0)), name='minDist')
diffs2.argmin(axis=0)

对于两组中的 1e4 (x,y) 对 - 运行 时间:

1.6 s ± 82.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

对于两组中的 1e5 (x,y) 对 - 运行 时间:

MemoryError: Unable to allocate 149. GiB for an array with shape (100000, 100000, 2) and data type float64

我将向您展示如何仅基于 Numpy.

计算结果

第一步是计算沿每个坐标的差异, 每个“X1 / Y1”点和每个“X2 / Y2”点之间:

diffs = df.iloc[:, 2:].values[np.newaxis, :, :]\
    - df.iloc[:, :2].values[:, np.newaxis, :]

然后计算这些差异的平方并求和(对于每个 点对):

diffs2 = (diffs ** 2).sum(axis=2)

最后一步是计算结果:

  • 找到每个“X2 / Y2”点的最小平方距离,
  • 从中计算根(对于每个点),
  • 转换为系列

执行此操作的代码是:

result = pd.Series(np.sqrt(diffs2.min(axis=0)), name='minDist')

另外,如果你想知道哪个“X1/Y1”点 最接近给定的“X2 / Y2”点,运行:

diffs2.argmin(axis=0)

您的数据是:

array([ 0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  3,
        6,  7,  9, 10, 11, 12, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14],
      dtype=int64)

这是基于 cdist 的替代解决方案:

from io import StringIO
import pandas as pd
import numpy as np
from scipy.spatial.distance import cdist

df = \
'''
X1,Y1,X2,Y2
41246.438,0.49,38791.673,0.49
41304.5,0.491,38921.557,0.491
41392.062,0.492,39037.135,0.492
41515.5,0.493,39199.972,0.493
41636.062,0.494,39346.561,0.494
41795.188,0.495,39477.63,0.495
42027.75,0.496,39576.275,0.496
42252.25,0.497,39732.102,0.497
42486.812,0.498,39833.753,0.498
42739.062,0.499,39949.13,0.499
43012.125,0.5,40135.42,0.5
43472.75,0.5,40292.017,0.5
43909.562,0.501,40479.452,0.501
44312.625,0.502,40725.329,0.502
44799.938,0.503,40950.05,0.503
45294.938,0.504,41214.136,0.504
45729.625,0.505,41514.213,0.505
45942.438,0.506,41943.208,0.506
46067.688,0.507,42296.643,0.507
46215,0.508,42653.477,0.508
46336.75,0.509,43138.834,0.509
46476.562,0.51,43557.815,0.51
46584.25,0.511,43966.564,0.511
46654.75,0.512,44166.996,0.512
46707.75,0.513,44310.557,0.513
46774.188,0.514,44410.069,0.514
46832.062,0.515,44518.045,0.515
46905.062,0.516,44608.646,0.516
46976.562,0.517,44678.073,0.517
47077.938,0.518,44727.393,0.518
47215.688,0.519,44786.498,0.519
47290.625,0.52,44845.867,0.52
47351.5,0.521,44915.072,0.521
'''

df = pd.read_csv(StringIO(df), sep=",")
print(df)

df[['X2','Y2']] = \
  df[['X2','Y2']].iloc[np.argmin(cdist(df[['X1','Y1']], df[['X2','Y2']],
  metric='euclidean' ), axis=1),:].copy().reset_index(drop=True)
df['D'] = np.linalg.norm(df[['X1','Y1']].values - df[['X2','Y2']].values, axis=1)
print(df)