python 数百万行的高效欧氏距离计算

Efficient euclidean distance calculation in python for millions of rows

我试图找出两个数据集的元素之间的欧式距离。每个都有数百万个元素。计算欧式距离后,我需要最接近的匹配。考虑到元素的数量,需要几天才能完成

下面是我正在尝试的代码。我还尝试使用距 scipy.spatial 的距离。但即使那样也需要永远

from sklearn.metrics.pairwise import euclidean_distances
df =pd.DataFrame(euclidean_distances(df1,df2))
df.index =  df1.index
df.columns = df2.index
df['min_distance'] = df.min(axis=1)
df['min_distance_id'] = df.idxmin(axis=1)

有没有其他方法可以在更短的时间内获得输出。

你看过 scipy.spatial.cKDTree 了吗?

您可以为您的一个数据集构造此数据结构,并查询它以获取第二个数据集中每个点的距离。

KDTree = scipy.spatial.cKDTree(df1)
distances, indexes = KDTree.query(df2, n_jobs=-1)

我在这里设置 n_jobs=-1 以使用所有可用的处理器。

我使用 numpy 为 2D 点列表编写了这个解决方案。它将快速找到两个点阵列之间最近的一对点。我尝试了两个列表,每个列表都有 1000 万个点,并在大约 4 分钟内得到了答案。每边200万点,只用了42秒。我不知道这是否足以满足您的需求,但它肯定比 "days" 快。如果您也需要,它还可以为更高的维度提供良好的性能。

def closest(A,B):

    def bruteForce(A,B):
        d = None
        swap = A.shape[0] > B.shape[0]
        if swap: A,B = B,A
        for pA in A:
            daB  = np.sum((pA-B)**2,axis=1)
            iMin = np.argmin(daB)
            if d is None or daB[iMin] < d:
                a,b = pA,B[iMin]
                d   = sum((a-b)**2)
        if swap: a,b = b,a
        return a,b,sqrt(d)

    # small sizes are faster using brute force
    if A.shape[0] * B.shape[0] < 1000000 \
    or A.shape[0] < 20 or B.shape[0] < 20:
        return bruteForce(A,B)

    # find center position
    midA  = np.sum(A,axis=0)/A.shape[0]
    midB  = np.sum(B,axis=0)/B.shape[0]
    midAB = (midA+midB)/2

    # closest A to center position
    A2midAB  = np.sum((A-midAB)**2,axis=1)
    iA       = np.argmin(A2midAB)    
    pA       = A[iA]

    # closest B to pA
    B2pA     = np.sum((B-pA)**2,axis=1)
    iB       = np.argmin(B2pA)
    pB       = B[iB]
    dAB      = sqrt(sum((pA-pB)**2))

    # distance of zero is best solution, return immediately
    if dAB == 0: return pA,pB,dAB

    # slope of ptA-ptB segment
    if pA[0] == pB[0]: p,m = 0,1 
    else:              p,m = 1,(pB[1]-pA[1])/(pB[0]-pA[0])

    # perpendicular line intersections with x axis from each point
    xA = m*A[:,1] + p*A[:,0] 
    xB = m*B[:,1] + p*B[:,0]

    # baselines for ptA and ptB
    baseA = xA[iA]
    baseB = xB[iB]
    rightSide = (baseB > baseA) 

    # partitions
    ArightOfA = (xA > baseA) == rightSide
    BrightOfA = (xB > baseA) == rightSide
    AleftOfB  = (xA > baseB) != rightSide
    BleftOfB  = (xB > baseB) != rightSide

    # include pB and exclude pA (we already know its closest point in B)
    ArightOfA[iA] = False
    AleftOfB[iA]  = False
    BleftOfB[iB]  = True
    BrightOfA[iB] = True

    # recurse left side
    if np.any(AleftOfB) and np.any(BleftOfB):
        lA,lB,lD = closest(A[AleftOfB],B[BleftOfB])
        if lD < dAB: pA,pB,dAB = lA,lB,lD

    # resurse right side
    if np.any(ArightOfA) and np.any(BrightOfA):
        rA,rB,rD = closest(A[ArightOfA],B[BrightOfA])
        if rD < dAB: pA,pB,dAB = rA,rB,rD

    return pA,pB,dAB

使用两组随机的 2D 点进行测试,每个点有 1000 万个点:

dimCount = 2
ACount   = 10000000
ASpread  = ACount
BCount   = ACount-1
BSpread  = BCount
A = np.random.random((ACount,dimCount))*ASpread-ASpread/2
B = np.random.random((BCount,dimCount))*BSpread-BSpread/2

a,b,d = closest(A,B)
print("closest points:",a,b,"distance:",d)

# closest points: [-4422004.2963273   2783038.35968559] [-4422004.76974851  2783038.61468366] distance: 0.5377282447465505

它的工作方式是根据战略性 selected 对 (pA,pB) 划分 A 点和 B 点。 pA 和 pB 之间的线用作两个列表的点的分区。然后递归地使用此分区的每一侧来查找其他(更接近的)点对。

在图形上,这对应于基于 pA-pB 线段垂直线的分区:

selecting pA 和 pB 的策略是找到两组点的近似中心,并从列表 A 中选择一个靠近该中心的点 (pA)。然后 select 列表 B 中最接近 pA 的点。这确保两条垂直线之间没有更接近另一个列表中的 pA 或 pB 的点。

A点和B点在垂线两侧的距离必然比pA-pB更远,所以可以将它们隔离成两个sub-lists,分别处理。

这允许 "divide and conquer" 方法大大减少要比较的 point-to-point 距离的数量。

在我的测试中(使用随机分布的点),性能似乎与 A 和 B 中的总点数成线性比例。我尝试通过创建距离较远的点的小簇来扭曲分布(这样就不会点实际上靠近近似中心)并且性能仍然是线性的。我不确定是否有任何 "worst case" 点分布会导致性能下降(我还没有找到)