查找二维点组之间的最小距离(快速且不太消耗内存)
Find minimum distances between groups of points in 2D (fast and not too memory consuming)
我在 2D A
和 B
中有两组点,我需要找到 A
中每个点到 [=13= 中的点的最小距离].到目前为止,我一直在使用 SciPy 的 cdist 和下面的代码
import numpy as np
from scipy.spatial.distance import cdist
def ABdist(A, B):
# Distance to all points in B, for each point in A.
dist = cdist(A, B, 'euclidean')
# Indexes to minimum distances.
min_dist_idx = np.argmin(dist, axis=1)
# Store only the minimum distances for each point in A, to a point in B.
min_dists = [dist[i][md_idx] for i, md_idx in enumerate(min_dist_idx)]
return min_dist_idx, min_dists
N = 10000
A = np.random.uniform(0., 5000., (N, 2))
B = np.random.uniform(0., 5000., (N, 2))
min_dist_idx, min_dists = ABdist(A, B)
这对于 N
的小值来说效果很好。但是现在集合的长度从 N=10000
增加到 N=35000
而我 运行 变成了
dm = np.zeros((mA, mB), dtype=np.double)
MemoryError
我知道我可以用一个 for 循环替换 cdist
,它只保留 A
中每个点到 B
中每个点的最小距离(和索引),因为这就是我所需要的。我不需要完整的 AxB
距离矩阵。但我一直在使用 cdist
正是因为它很快。
有没有办法用(几乎?)一样快但不占用那么多内存的实现替换 cdist
?
这里的诀窍是最大化计算与内存的比率。输出的长度为 N
,A
中的每个点都有一个索引和距离。我们可以将其减少到一个循环,每次迭代一个输出元素,这样每次迭代将处理所有 B
个点,这带来了高计算率。
因此,利用受 启发的 einsum
和 matrix-multiplication
,对于 A
中的每个点 pt
,我们将得到平方欧氏距离,像这样 -
for pt in A:
d = np.einsum('ij,ij->i',B,B) + pt.dot(pt) - 2*B.dot(pt)
因此,概括它涵盖 A
和预计算 np.einsum('ij,ij->i',B,B)
中的所有点,我们将有一个像这样的实现 -
min_idx = np.empty(N, dtype=int)
min_dist = np.empty(N)
Bsqsum = np.einsum('ij,ij->i',B,B)
for i,pt in enumerate(A):
d = Bsqsum + pt.dot(pt) - 2*B.dot(pt)
min_idx[i] = d.argmin()
min_dist[i] = d[min_idx[i]]
min_dist = np.sqrt(min_dist)
分块工作
现在,完全矢量化的解决方案是 -
np.einsum('ij,ij->i',B,B)[:,None] + np.einsum('ij,ij->i',A,A) - 2*B.dot(A.T)
因此,要分块工作,我们会切出 A
行,这样做会更容易简单地重塑为 3D
,就像这样 -
chunk_size= 100 # Edit this as per memory setup available
# More means more memory needed
A.shape = (A.shape[0]//chunk_size, chunk_size,-1)
min_idx = np.empty((N//chunk_size, chunk_size), dtype=int)
min_dist = np.empty((N//chunk_size, chunk_size))
Bsqsum = np.einsum('ij,ij->i',B,B)[:,None]
r = np.arange(chunk_size)
for i,chnk in enumerate(A):
d = Bsqsum + np.einsum('ij,ij->i',chnk,chnk) - 2*B.dot(chnk.T)
idx = d.argmin(0)
min_idx[i] = idx
min_dist[i] = d[idx,r]
min_dist = np.sqrt(min_dist)
min_idx.shape = (N,)
min_dist.shape = (N,)
A.shape = (N,-1)
最好的方法是使用专门为最近邻搜索设计的数据结构,例如 k-d tree. For example, SciPy's cKDTree 允许您以这种方式解决问题:
from scipy.spatial import cKDTree
min_dists, min_dist_idx = cKDTree(B).query(A, 1)
无论是在计算还是内存使用方面,结果都比任何基于广播的方法更有效。
例如,即使有 1,000,000 个点,计算也不会 运行 内存不足,并且在我的笔记本电脑上只需要几秒钟:
N = 1000000
A = np.random.uniform(0., 5000., (N, 2))
B = np.random.uniform(0., 5000., (N, 2))
%timeit cKDTree(B).query(A, 1)
# 3.25 s ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
我在 2D A
和 B
中有两组点,我需要找到 A
中每个点到 [=13= 中的点的最小距离].到目前为止,我一直在使用 SciPy 的 cdist 和下面的代码
import numpy as np
from scipy.spatial.distance import cdist
def ABdist(A, B):
# Distance to all points in B, for each point in A.
dist = cdist(A, B, 'euclidean')
# Indexes to minimum distances.
min_dist_idx = np.argmin(dist, axis=1)
# Store only the minimum distances for each point in A, to a point in B.
min_dists = [dist[i][md_idx] for i, md_idx in enumerate(min_dist_idx)]
return min_dist_idx, min_dists
N = 10000
A = np.random.uniform(0., 5000., (N, 2))
B = np.random.uniform(0., 5000., (N, 2))
min_dist_idx, min_dists = ABdist(A, B)
这对于 N
的小值来说效果很好。但是现在集合的长度从 N=10000
增加到 N=35000
而我 运行 变成了
dm = np.zeros((mA, mB), dtype=np.double)
MemoryError
我知道我可以用一个 for 循环替换 cdist
,它只保留 A
中每个点到 B
中每个点的最小距离(和索引),因为这就是我所需要的。我不需要完整的 AxB
距离矩阵。但我一直在使用 cdist
正是因为它很快。
有没有办法用(几乎?)一样快但不占用那么多内存的实现替换 cdist
?
这里的诀窍是最大化计算与内存的比率。输出的长度为 N
,A
中的每个点都有一个索引和距离。我们可以将其减少到一个循环,每次迭代一个输出元素,这样每次迭代将处理所有 B
个点,这带来了高计算率。
因此,利用受 einsum
和 matrix-multiplication
,对于 A
中的每个点 pt
,我们将得到平方欧氏距离,像这样 -
for pt in A:
d = np.einsum('ij,ij->i',B,B) + pt.dot(pt) - 2*B.dot(pt)
因此,概括它涵盖 A
和预计算 np.einsum('ij,ij->i',B,B)
中的所有点,我们将有一个像这样的实现 -
min_idx = np.empty(N, dtype=int)
min_dist = np.empty(N)
Bsqsum = np.einsum('ij,ij->i',B,B)
for i,pt in enumerate(A):
d = Bsqsum + pt.dot(pt) - 2*B.dot(pt)
min_idx[i] = d.argmin()
min_dist[i] = d[min_idx[i]]
min_dist = np.sqrt(min_dist)
分块工作
现在,完全矢量化的解决方案是 -
np.einsum('ij,ij->i',B,B)[:,None] + np.einsum('ij,ij->i',A,A) - 2*B.dot(A.T)
因此,要分块工作,我们会切出 A
行,这样做会更容易简单地重塑为 3D
,就像这样 -
chunk_size= 100 # Edit this as per memory setup available
# More means more memory needed
A.shape = (A.shape[0]//chunk_size, chunk_size,-1)
min_idx = np.empty((N//chunk_size, chunk_size), dtype=int)
min_dist = np.empty((N//chunk_size, chunk_size))
Bsqsum = np.einsum('ij,ij->i',B,B)[:,None]
r = np.arange(chunk_size)
for i,chnk in enumerate(A):
d = Bsqsum + np.einsum('ij,ij->i',chnk,chnk) - 2*B.dot(chnk.T)
idx = d.argmin(0)
min_idx[i] = idx
min_dist[i] = d[idx,r]
min_dist = np.sqrt(min_dist)
min_idx.shape = (N,)
min_dist.shape = (N,)
A.shape = (N,-1)
最好的方法是使用专门为最近邻搜索设计的数据结构,例如 k-d tree. For example, SciPy's cKDTree 允许您以这种方式解决问题:
from scipy.spatial import cKDTree
min_dists, min_dist_idx = cKDTree(B).query(A, 1)
无论是在计算还是内存使用方面,结果都比任何基于广播的方法更有效。
例如,即使有 1,000,000 个点,计算也不会 运行 内存不足,并且在我的笔记本电脑上只需要几秒钟:
N = 1000000
A = np.random.uniform(0., 5000., (N, 2))
B = np.random.uniform(0., 5000., (N, 2))
%timeit cKDTree(B).query(A, 1)
# 3.25 s ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)