python 数百万行的高效欧氏距离计算
Efficient euclidean distance calculation in python for millions of rows
我试图找出两个数据集的元素之间的欧式距离。每个都有数百万个元素。计算欧式距离后,我需要最接近的匹配。考虑到元素的数量,需要几天才能完成
下面是我正在尝试的代码。我还尝试使用距 scipy.spatial 的距离。但即使那样也需要永远
from sklearn.metrics.pairwise import euclidean_distances
df =pd.DataFrame(euclidean_distances(df1,df2))
df.index = df1.index
df.columns = df2.index
df['min_distance'] = df.min(axis=1)
df['min_distance_id'] = df.idxmin(axis=1)
有没有其他方法可以在更短的时间内获得输出。
你看过 scipy.spatial.cKDTree
了吗?
您可以为您的一个数据集构造此数据结构,并查询它以获取第二个数据集中每个点的距离。
KDTree = scipy.spatial.cKDTree(df1)
distances, indexes = KDTree.query(df2, n_jobs=-1)
我在这里设置 n_jobs=-1
以使用所有可用的处理器。
我使用 numpy 为 2D 点列表编写了这个解决方案。它将快速找到两个点阵列之间最近的一对点。我尝试了两个列表,每个列表都有 1000 万个点,并在大约 4 分钟内得到了答案。每边200万点,只用了42秒。我不知道这是否足以满足您的需求,但它肯定比 "days" 快。如果您也需要,它还可以为更高的维度提供良好的性能。
def closest(A,B):
def bruteForce(A,B):
d = None
swap = A.shape[0] > B.shape[0]
if swap: A,B = B,A
for pA in A:
daB = np.sum((pA-B)**2,axis=1)
iMin = np.argmin(daB)
if d is None or daB[iMin] < d:
a,b = pA,B[iMin]
d = sum((a-b)**2)
if swap: a,b = b,a
return a,b,sqrt(d)
# small sizes are faster using brute force
if A.shape[0] * B.shape[0] < 1000000 \
or A.shape[0] < 20 or B.shape[0] < 20:
return bruteForce(A,B)
# find center position
midA = np.sum(A,axis=0)/A.shape[0]
midB = np.sum(B,axis=0)/B.shape[0]
midAB = (midA+midB)/2
# closest A to center position
A2midAB = np.sum((A-midAB)**2,axis=1)
iA = np.argmin(A2midAB)
pA = A[iA]
# closest B to pA
B2pA = np.sum((B-pA)**2,axis=1)
iB = np.argmin(B2pA)
pB = B[iB]
dAB = sqrt(sum((pA-pB)**2))
# distance of zero is best solution, return immediately
if dAB == 0: return pA,pB,dAB
# slope of ptA-ptB segment
if pA[0] == pB[0]: p,m = 0,1
else: p,m = 1,(pB[1]-pA[1])/(pB[0]-pA[0])
# perpendicular line intersections with x axis from each point
xA = m*A[:,1] + p*A[:,0]
xB = m*B[:,1] + p*B[:,0]
# baselines for ptA and ptB
baseA = xA[iA]
baseB = xB[iB]
rightSide = (baseB > baseA)
# partitions
ArightOfA = (xA > baseA) == rightSide
BrightOfA = (xB > baseA) == rightSide
AleftOfB = (xA > baseB) != rightSide
BleftOfB = (xB > baseB) != rightSide
# include pB and exclude pA (we already know its closest point in B)
ArightOfA[iA] = False
AleftOfB[iA] = False
BleftOfB[iB] = True
BrightOfA[iB] = True
# recurse left side
if np.any(AleftOfB) and np.any(BleftOfB):
lA,lB,lD = closest(A[AleftOfB],B[BleftOfB])
if lD < dAB: pA,pB,dAB = lA,lB,lD
# resurse right side
if np.any(ArightOfA) and np.any(BrightOfA):
rA,rB,rD = closest(A[ArightOfA],B[BrightOfA])
if rD < dAB: pA,pB,dAB = rA,rB,rD
return pA,pB,dAB
使用两组随机的 2D 点进行测试,每个点有 1000 万个点:
dimCount = 2
ACount = 10000000
ASpread = ACount
BCount = ACount-1
BSpread = BCount
A = np.random.random((ACount,dimCount))*ASpread-ASpread/2
B = np.random.random((BCount,dimCount))*BSpread-BSpread/2
a,b,d = closest(A,B)
print("closest points:",a,b,"distance:",d)
# closest points: [-4422004.2963273 2783038.35968559] [-4422004.76974851 2783038.61468366] distance: 0.5377282447465505
它的工作方式是根据战略性 selected 对 (pA,pB) 划分 A 点和 B 点。 pA 和 pB 之间的线用作两个列表的点的分区。然后递归地使用此分区的每一侧来查找其他(更接近的)点对。
在图形上,这对应于基于 pA-pB 线段垂直线的分区:
selecting pA 和 pB 的策略是找到两组点的近似中心,并从列表 A 中选择一个靠近该中心的点 (pA)。然后 select 列表 B 中最接近 pA 的点。这确保两条垂直线之间没有更接近另一个列表中的 pA 或 pB 的点。
A点和B点在垂线两侧的距离必然比pA-pB更远,所以可以将它们隔离成两个sub-lists,分别处理。
这允许 "divide and conquer" 方法大大减少要比较的 point-to-point 距离的数量。
在我的测试中(使用随机分布的点),性能似乎与 A 和 B 中的总点数成线性比例。我尝试通过创建距离较远的点的小簇来扭曲分布(这样就不会点实际上靠近近似中心)并且性能仍然是线性的。我不确定是否有任何 "worst case" 点分布会导致性能下降(我还没有找到)
我试图找出两个数据集的元素之间的欧式距离。每个都有数百万个元素。计算欧式距离后,我需要最接近的匹配。考虑到元素的数量,需要几天才能完成
下面是我正在尝试的代码。我还尝试使用距 scipy.spatial 的距离。但即使那样也需要永远
from sklearn.metrics.pairwise import euclidean_distances
df =pd.DataFrame(euclidean_distances(df1,df2))
df.index = df1.index
df.columns = df2.index
df['min_distance'] = df.min(axis=1)
df['min_distance_id'] = df.idxmin(axis=1)
有没有其他方法可以在更短的时间内获得输出。
你看过 scipy.spatial.cKDTree
了吗?
您可以为您的一个数据集构造此数据结构,并查询它以获取第二个数据集中每个点的距离。
KDTree = scipy.spatial.cKDTree(df1)
distances, indexes = KDTree.query(df2, n_jobs=-1)
我在这里设置 n_jobs=-1
以使用所有可用的处理器。
我使用 numpy 为 2D 点列表编写了这个解决方案。它将快速找到两个点阵列之间最近的一对点。我尝试了两个列表,每个列表都有 1000 万个点,并在大约 4 分钟内得到了答案。每边200万点,只用了42秒。我不知道这是否足以满足您的需求,但它肯定比 "days" 快。如果您也需要,它还可以为更高的维度提供良好的性能。
def closest(A,B):
def bruteForce(A,B):
d = None
swap = A.shape[0] > B.shape[0]
if swap: A,B = B,A
for pA in A:
daB = np.sum((pA-B)**2,axis=1)
iMin = np.argmin(daB)
if d is None or daB[iMin] < d:
a,b = pA,B[iMin]
d = sum((a-b)**2)
if swap: a,b = b,a
return a,b,sqrt(d)
# small sizes are faster using brute force
if A.shape[0] * B.shape[0] < 1000000 \
or A.shape[0] < 20 or B.shape[0] < 20:
return bruteForce(A,B)
# find center position
midA = np.sum(A,axis=0)/A.shape[0]
midB = np.sum(B,axis=0)/B.shape[0]
midAB = (midA+midB)/2
# closest A to center position
A2midAB = np.sum((A-midAB)**2,axis=1)
iA = np.argmin(A2midAB)
pA = A[iA]
# closest B to pA
B2pA = np.sum((B-pA)**2,axis=1)
iB = np.argmin(B2pA)
pB = B[iB]
dAB = sqrt(sum((pA-pB)**2))
# distance of zero is best solution, return immediately
if dAB == 0: return pA,pB,dAB
# slope of ptA-ptB segment
if pA[0] == pB[0]: p,m = 0,1
else: p,m = 1,(pB[1]-pA[1])/(pB[0]-pA[0])
# perpendicular line intersections with x axis from each point
xA = m*A[:,1] + p*A[:,0]
xB = m*B[:,1] + p*B[:,0]
# baselines for ptA and ptB
baseA = xA[iA]
baseB = xB[iB]
rightSide = (baseB > baseA)
# partitions
ArightOfA = (xA > baseA) == rightSide
BrightOfA = (xB > baseA) == rightSide
AleftOfB = (xA > baseB) != rightSide
BleftOfB = (xB > baseB) != rightSide
# include pB and exclude pA (we already know its closest point in B)
ArightOfA[iA] = False
AleftOfB[iA] = False
BleftOfB[iB] = True
BrightOfA[iB] = True
# recurse left side
if np.any(AleftOfB) and np.any(BleftOfB):
lA,lB,lD = closest(A[AleftOfB],B[BleftOfB])
if lD < dAB: pA,pB,dAB = lA,lB,lD
# resurse right side
if np.any(ArightOfA) and np.any(BrightOfA):
rA,rB,rD = closest(A[ArightOfA],B[BrightOfA])
if rD < dAB: pA,pB,dAB = rA,rB,rD
return pA,pB,dAB
使用两组随机的 2D 点进行测试,每个点有 1000 万个点:
dimCount = 2
ACount = 10000000
ASpread = ACount
BCount = ACount-1
BSpread = BCount
A = np.random.random((ACount,dimCount))*ASpread-ASpread/2
B = np.random.random((BCount,dimCount))*BSpread-BSpread/2
a,b,d = closest(A,B)
print("closest points:",a,b,"distance:",d)
# closest points: [-4422004.2963273 2783038.35968559] [-4422004.76974851 2783038.61468366] distance: 0.5377282447465505
它的工作方式是根据战略性 selected 对 (pA,pB) 划分 A 点和 B 点。 pA 和 pB 之间的线用作两个列表的点的分区。然后递归地使用此分区的每一侧来查找其他(更接近的)点对。
在图形上,这对应于基于 pA-pB 线段垂直线的分区:
selecting pA 和 pB 的策略是找到两组点的近似中心,并从列表 A 中选择一个靠近该中心的点 (pA)。然后 select 列表 B 中最接近 pA 的点。这确保两条垂直线之间没有更接近另一个列表中的 pA 或 pB 的点。
A点和B点在垂线两侧的距离必然比pA-pB更远,所以可以将它们隔离成两个sub-lists,分别处理。
这允许 "divide and conquer" 方法大大减少要比较的 point-to-point 距离的数量。
在我的测试中(使用随机分布的点),性能似乎与 A 和 B 中的总点数成线性比例。我尝试通过创建距离较远的点的小簇来扭曲分布(这样就不会点实际上靠近近似中心)并且性能仍然是线性的。我不确定是否有任何 "worst case" 点分布会导致性能下降(我还没有找到)