有效检查 Python 中大量对象的欧氏距离
Efficiently checking Euclidean distance for a large number of objects in Python
在路线规划算法中,我试图根据到另一个节点的距离对节点列表执行过滤。我实际上是从粗略的场景图中提取列表。我使用术语 "cell" 来指代一个简单场景图中的体积,我们从中获取了彼此靠近的节点列表。
现在,我将其实现为:
# SSCCE version of the core function
def nodes_in_range(src, cell, maxDist):
srcX, srcY, srcZ = src.x, src.y, src.z
maxDistSq = maxDist ** 2
for node in cell:
distSq = (node.x - srcX) ** 2
if distSq > maxDistSq: continue
distSq += (node.y - srcY) ** 2
if distSq > maxDistSq: continue
distSq += (node.z - srcZ) ** 2
if distSq <= maxDistSq:
yield node, distSq ** 0.5 # fast sqrt
from collections import namedtuple
class Node(namedtuple('Node', ('ID', 'x', 'y', 'z'))):
# actual class has assorted other properties
pass
# 1, 3 and 9 are <= 4.2 from Node(1)
cell = [
Node(1, 0, 0, 0),
Node(2, -2, -3, 4),
Node(3, .1, .2, .3),
Node(4, 2.3, -3.3, -4.5),
Node(5, -2.5, 4.5, 5),
Node(6, 4, 3., 2.),
Node(7, -2.46, 2.46, -2.47),
Node(8, 2.45, -2.46, -2.47),
Node(9, .5, .5, .1),
Node(10, 5, 6, 7),
# In practice, cells have upto 600 entries
]
if __name__ == "__main__":
for node, dist in nodes_in_range(cell[0], cell, 4.2):
print("{:3n} {:5.2f}".format(node.ID, dist))
这个例程被调用了很多次(在某些查询中 10^7+ 次),所以性能的每一位都很重要,避免使用条件进行成员查找实际上有所帮助。
我想做的是切换到 numpy 并组织单元格,以便我可以矢量化。我想要实现的是:
import numpy
import numpy.linalg
contarry = numpy.ascontiguousarray
float32 = numpy.float32
# The "np_cell" has two arrays: one is the list of nodes and the
# second is a vectorizable array of their positions.
# np_cell[N][1] == numpy array position of np_cell[N][0]
def make_np_cell(cell):
return (
cell,
contarry([contarry((node.x, node.y, node.z), float32) for node in cell]),
)
# This version fails because norm returns a single value.
def np_nodes_in_range1(srcPos, np_cell, maxDist):
distances = numpy.linalg.norm(np_cell[1] - srcPos)
for (node, dist) in zip(np_cell[0], distances):
if dist <= maxDist:
yield node, dist
# This version fails because
def np_nodes_in_range2(srcPos, np_cell, maxDist):
# this will fail because the distances are wrong
distances = numpy.linalg.norm(np_cell[1] - srcPos, ord=1, axis=1)
for (node, dist) in zip(np_cell[0], distances):
if dist <= maxDist:
yield node, dist
# This version doesn't vectorize and so performs poorly
def np_nodes_in_range3(srcPos, np_cell, maxDist):
norm = numpy.linalg.norm
for (node, pos) in zip(np_cell[0], np_cell[1]):
dist = norm(srcPos - pos)
if dist <= maxDist:
yield node, dist
if __name__ == "__main__":
np_cell = make_np_cell(cell)
srcPos = np_cell[1][0] # Position column [1], first node [0]
print("v1 - fails because it gets a single distance")
try:
for node, dist in np_nodes_in_range1(srcPos, np_cell, float32(4.2)):
print("{:3n} {:5.2f}".format(node.ID, dist))
except TypeError:
print("distances was a single value")
print("v2 - gets the wrong distance values")
for node, dist in np_nodes_in_range2(srcPos, np_cell, float32(4.2)):
print("{:3n} {:5.2f}".format(node.ID, dist))
print("v3 - slower")
for node, dist in np_nodes_in_range3(srcPos, np_cell, float32(4.2)):
print("{:3n} {:5.2f}".format(node.ID, dist))
组合整体是 here - 我包含了一个 v4,它尝试使用 enumerate
而不是 zip
并发现它慢了大约 12us。
示例输出:
1 0.00
3 0.37
9 0.71
v1 - fails because it gets a single distance
distances was a single value
v2 - gets the wrong distance values
1 0.00
3 0.60
9 1.10
v3 - slower
1 0.00
3 0.37
9 0.71
v4 - v2 using enumerate
1 0.00
3 0.60
9 1.10
至于性能,我们可以使用 timeit
进行测试。我将通过简单的乘法增加单元格中的节点数:
In [2]: from sscce import *
In [3]: cell = cell * 32 # increase to 320 nodes
In [4]: len(cell)
Out[4]: 320
In [5]: %timeit -n 1000 -r 7 sum(1 for _ in nodes_in_range(cell[0], cell, 4.2))
1000 loops, best of 7: 742 µs per loop
In [6]: np_cell = make_np_cell(cell)
In [7]: srcPos = np_cell[1][0]
In [8]: %timeit -n 1000 -r 7 sum(1 for _ in np_nodes_in_range2(srcPos, np_cell, numpy.float32(4.2)))
1000 loops, best of 7: 136 µs per loop
In [9]: %timeit -n 1000 -r 7 sum(1 for _ in np_nodes_in_range3(srcPos, np_cell, numpy.float32(4.2)))
1000 loops, best of 7: 3.64 ms per loop
亮点:
nodes_in_range
1000 loops, best of 7: 742 µs per loop
np_nodes_in_range2
1000 loops, best of 7: 136 µs per loop
np_nodes_in_range3
1000 loops, best of 7: 3.64 ms per loop # OUCH
问题:
矢量化距离计算我做错了什么?
distances = numpy.linalg.norm(np_cell[1] - srcPos)
对
distances = numpy.linalg.norm(np_cell[1] - srcPos, ord=1, axis=1)
这是最好的方法吗?
细胞数量在几个节点和数百个节点之间变化。我目前遍历单元格,但似乎我想编组一整套候选人 (nodes[], positions[])
,尽管为此构建列表可能会产生额外成本(我总是可以使用批处理累加器,所以我总是尝试和在排空之前用至少 1024 个位置填充蓄能器)。但我认为这种想法是由我使用连续数组形成的。我应该寻找类似的东西:
nodes_in_range(src, chain(cell.nodes for cell in scene if cell_in_range(boundingBox)))
而且不担心试图把整个东西弄平?
What am I doing wrong with the vectorized distance calculation?
distances = numpy.linalg.norm(np_cell[1] - srcPos)
vs
distances = numpy.linalg.norm(np_cell[1] - srcPos, ord=1, axis=1)
首先,如果 axis=None
,np.linalg.norm
将计算向量范数(如果输入是一维的)或矩阵范数(如果输入是多维的)。这两个都是标量。
其次,ord=1
表示L1范数(即Manhattan distance),而不是欧氏距离,正如您在标题中提到的那样。
- Is this the best approach?
A k-D tree would probably be much faster. You can use scipy.spatial.cKDTree
执行球搜索以查找距查询点一定阈值距离内的节点:
import numpy as np
from scipy.spatial import cKDTree
# it will be much easier (and faster) to deal with numpy arrays here (you could
# always look up the corresponding node objects by index if you wanted to)
X = np.array([(n.x, n.y, n.z) for n in cell])
# construct a k-D tree
tree = cKDTree(X)
# query it with the first point, find the indices of all points within a maximum
# distance of 4.2 of the query point
query_point = X[0]
idx = tree.query_ball_point(query_point, r=4.2, p=2)
# these indices are one out from yours, since they start at 0 rather than 1
print(idx)
# [0, 2, 8]
# query_ball_point doesn't return the distances, but we can easily compute these
# using broadcasting
neighbor_points = X[idx]
d = np.sqrt(((query_point[None, :] - neighbor_points) ** 2).sum(1))
print(d)
# [ 0. 0.37416574 0.71414284]
基准测试:
查询 cKDTree
非常快,即使对于非常大的点也是如此:
X = np.random.randn(10000000, 3)
tree = cKDTree(X)
%timeit tree.query_ball_point(np.random.randn(3), r=4.2)
# 1 loops, best of 3: 229 ms per loop
正如您在评论中提到的,上面的示例是比您的数据更严格的性能测试。由于选择了距离公差,而且数据是高斯分布的(因此聚集在 0 附近),它与 10m 点的大约 99% 匹配。
这是对统一数据的测试,具有更严格的距离截止,匹配大约 30% 的点,如您的示例所示:
%timeit tree.query_ball_point((0., 0., 0.), r=1.2)
# 10 loops, best of 3: 86 ms per loop
显然,这比您使用的点数要多得多。对于您的示例数据:
tree = cKDTree(np_cell[1])
%timeit tree.query_ball_point(np_cell[1][0], r=4.2)
# The slowest run took 4.26 times longer than the fastest. This could mean that an intermediate result is being cached
# 100000 loops, best of 3: 16.9 µs per loop
这比我机器上的 np_nodes_in_range2
功能更胜一筹:
%timeit sum(1 for _ in np_nodes_in_range2(srcPos, np_cell, numpy.float32(4.2)))
# The slowest run took 7.77 times longer than the fastest. This could mean that an intermediate result is being cached
# 10000 loops, best of 3: 84.4 µs per loop
其他需要考虑的事项:
如果需要同时查询很多点,构建第二棵树并使用query_ball_tree
而不是query_ball_point
更高效:
X = np.random.randn(100, 3)
Y = np.random.randn(10, 3)
tree1 = cKDTree(X)
tree2 = cKDTree(Y)
# indices contains a list-of-lists, where the ith sublist contains the indices
# of the neighbours of Y[i] in X
indices = tree2.query_ball_tree(tree1, r=4.2)
如果你不关心指数,只想要球中的点数,使用 count_neighbours
:
可能会更快
n_neighbors = tree2.count_neighbors(tree1, r=4.2)
在路线规划算法中,我试图根据到另一个节点的距离对节点列表执行过滤。我实际上是从粗略的场景图中提取列表。我使用术语 "cell" 来指代一个简单场景图中的体积,我们从中获取了彼此靠近的节点列表。
现在,我将其实现为:
# SSCCE version of the core function
def nodes_in_range(src, cell, maxDist):
srcX, srcY, srcZ = src.x, src.y, src.z
maxDistSq = maxDist ** 2
for node in cell:
distSq = (node.x - srcX) ** 2
if distSq > maxDistSq: continue
distSq += (node.y - srcY) ** 2
if distSq > maxDistSq: continue
distSq += (node.z - srcZ) ** 2
if distSq <= maxDistSq:
yield node, distSq ** 0.5 # fast sqrt
from collections import namedtuple
class Node(namedtuple('Node', ('ID', 'x', 'y', 'z'))):
# actual class has assorted other properties
pass
# 1, 3 and 9 are <= 4.2 from Node(1)
cell = [
Node(1, 0, 0, 0),
Node(2, -2, -3, 4),
Node(3, .1, .2, .3),
Node(4, 2.3, -3.3, -4.5),
Node(5, -2.5, 4.5, 5),
Node(6, 4, 3., 2.),
Node(7, -2.46, 2.46, -2.47),
Node(8, 2.45, -2.46, -2.47),
Node(9, .5, .5, .1),
Node(10, 5, 6, 7),
# In practice, cells have upto 600 entries
]
if __name__ == "__main__":
for node, dist in nodes_in_range(cell[0], cell, 4.2):
print("{:3n} {:5.2f}".format(node.ID, dist))
这个例程被调用了很多次(在某些查询中 10^7+ 次),所以性能的每一位都很重要,避免使用条件进行成员查找实际上有所帮助。
我想做的是切换到 numpy 并组织单元格,以便我可以矢量化。我想要实现的是:
import numpy
import numpy.linalg
contarry = numpy.ascontiguousarray
float32 = numpy.float32
# The "np_cell" has two arrays: one is the list of nodes and the
# second is a vectorizable array of their positions.
# np_cell[N][1] == numpy array position of np_cell[N][0]
def make_np_cell(cell):
return (
cell,
contarry([contarry((node.x, node.y, node.z), float32) for node in cell]),
)
# This version fails because norm returns a single value.
def np_nodes_in_range1(srcPos, np_cell, maxDist):
distances = numpy.linalg.norm(np_cell[1] - srcPos)
for (node, dist) in zip(np_cell[0], distances):
if dist <= maxDist:
yield node, dist
# This version fails because
def np_nodes_in_range2(srcPos, np_cell, maxDist):
# this will fail because the distances are wrong
distances = numpy.linalg.norm(np_cell[1] - srcPos, ord=1, axis=1)
for (node, dist) in zip(np_cell[0], distances):
if dist <= maxDist:
yield node, dist
# This version doesn't vectorize and so performs poorly
def np_nodes_in_range3(srcPos, np_cell, maxDist):
norm = numpy.linalg.norm
for (node, pos) in zip(np_cell[0], np_cell[1]):
dist = norm(srcPos - pos)
if dist <= maxDist:
yield node, dist
if __name__ == "__main__":
np_cell = make_np_cell(cell)
srcPos = np_cell[1][0] # Position column [1], first node [0]
print("v1 - fails because it gets a single distance")
try:
for node, dist in np_nodes_in_range1(srcPos, np_cell, float32(4.2)):
print("{:3n} {:5.2f}".format(node.ID, dist))
except TypeError:
print("distances was a single value")
print("v2 - gets the wrong distance values")
for node, dist in np_nodes_in_range2(srcPos, np_cell, float32(4.2)):
print("{:3n} {:5.2f}".format(node.ID, dist))
print("v3 - slower")
for node, dist in np_nodes_in_range3(srcPos, np_cell, float32(4.2)):
print("{:3n} {:5.2f}".format(node.ID, dist))
组合整体是 here - 我包含了一个 v4,它尝试使用 enumerate
而不是 zip
并发现它慢了大约 12us。
示例输出:
1 0.00
3 0.37
9 0.71
v1 - fails because it gets a single distance
distances was a single value
v2 - gets the wrong distance values
1 0.00
3 0.60
9 1.10
v3 - slower
1 0.00
3 0.37
9 0.71
v4 - v2 using enumerate
1 0.00
3 0.60
9 1.10
至于性能,我们可以使用 timeit
进行测试。我将通过简单的乘法增加单元格中的节点数:
In [2]: from sscce import *
In [3]: cell = cell * 32 # increase to 320 nodes
In [4]: len(cell)
Out[4]: 320
In [5]: %timeit -n 1000 -r 7 sum(1 for _ in nodes_in_range(cell[0], cell, 4.2))
1000 loops, best of 7: 742 µs per loop
In [6]: np_cell = make_np_cell(cell)
In [7]: srcPos = np_cell[1][0]
In [8]: %timeit -n 1000 -r 7 sum(1 for _ in np_nodes_in_range2(srcPos, np_cell, numpy.float32(4.2)))
1000 loops, best of 7: 136 µs per loop
In [9]: %timeit -n 1000 -r 7 sum(1 for _ in np_nodes_in_range3(srcPos, np_cell, numpy.float32(4.2)))
1000 loops, best of 7: 3.64 ms per loop
亮点:
nodes_in_range
1000 loops, best of 7: 742 µs per loop
np_nodes_in_range2
1000 loops, best of 7: 136 µs per loop
np_nodes_in_range3
1000 loops, best of 7: 3.64 ms per loop # OUCH
问题:
矢量化距离计算我做错了什么?
distances = numpy.linalg.norm(np_cell[1] - srcPos)
对
distances = numpy.linalg.norm(np_cell[1] - srcPos, ord=1, axis=1)
这是最好的方法吗?
细胞数量在几个节点和数百个节点之间变化。我目前遍历单元格,但似乎我想编组一整套候选人
(nodes[], positions[])
,尽管为此构建列表可能会产生额外成本(我总是可以使用批处理累加器,所以我总是尝试和在排空之前用至少 1024 个位置填充蓄能器)。但我认为这种想法是由我使用连续数组形成的。我应该寻找类似的东西:nodes_in_range(src, chain(cell.nodes for cell in scene if cell_in_range(boundingBox)))
而且不担心试图把整个东西弄平?
What am I doing wrong with the vectorized distance calculation?
distances = numpy.linalg.norm(np_cell[1] - srcPos)
vs
distances = numpy.linalg.norm(np_cell[1] - srcPos, ord=1, axis=1)
首先,如果 axis=None
,np.linalg.norm
将计算向量范数(如果输入是一维的)或矩阵范数(如果输入是多维的)。这两个都是标量。
其次,ord=1
表示L1范数(即Manhattan distance),而不是欧氏距离,正如您在标题中提到的那样。
- Is this the best approach?
A k-D tree would probably be much faster. You can use scipy.spatial.cKDTree
执行球搜索以查找距查询点一定阈值距离内的节点:
import numpy as np
from scipy.spatial import cKDTree
# it will be much easier (and faster) to deal with numpy arrays here (you could
# always look up the corresponding node objects by index if you wanted to)
X = np.array([(n.x, n.y, n.z) for n in cell])
# construct a k-D tree
tree = cKDTree(X)
# query it with the first point, find the indices of all points within a maximum
# distance of 4.2 of the query point
query_point = X[0]
idx = tree.query_ball_point(query_point, r=4.2, p=2)
# these indices are one out from yours, since they start at 0 rather than 1
print(idx)
# [0, 2, 8]
# query_ball_point doesn't return the distances, but we can easily compute these
# using broadcasting
neighbor_points = X[idx]
d = np.sqrt(((query_point[None, :] - neighbor_points) ** 2).sum(1))
print(d)
# [ 0. 0.37416574 0.71414284]
基准测试:
查询 cKDTree
非常快,即使对于非常大的点也是如此:
X = np.random.randn(10000000, 3)
tree = cKDTree(X)
%timeit tree.query_ball_point(np.random.randn(3), r=4.2)
# 1 loops, best of 3: 229 ms per loop
正如您在评论中提到的,上面的示例是比您的数据更严格的性能测试。由于选择了距离公差,而且数据是高斯分布的(因此聚集在 0 附近),它与 10m 点的大约 99% 匹配。
这是对统一数据的测试,具有更严格的距离截止,匹配大约 30% 的点,如您的示例所示:
%timeit tree.query_ball_point((0., 0., 0.), r=1.2)
# 10 loops, best of 3: 86 ms per loop
显然,这比您使用的点数要多得多。对于您的示例数据:
tree = cKDTree(np_cell[1])
%timeit tree.query_ball_point(np_cell[1][0], r=4.2)
# The slowest run took 4.26 times longer than the fastest. This could mean that an intermediate result is being cached
# 100000 loops, best of 3: 16.9 µs per loop
这比我机器上的 np_nodes_in_range2
功能更胜一筹:
%timeit sum(1 for _ in np_nodes_in_range2(srcPos, np_cell, numpy.float32(4.2)))
# The slowest run took 7.77 times longer than the fastest. This could mean that an intermediate result is being cached
# 10000 loops, best of 3: 84.4 µs per loop
其他需要考虑的事项:
如果需要同时查询很多点,构建第二棵树并使用query_ball_tree
而不是query_ball_point
更高效:
X = np.random.randn(100, 3)
Y = np.random.randn(10, 3)
tree1 = cKDTree(X)
tree2 = cKDTree(Y)
# indices contains a list-of-lists, where the ith sublist contains the indices
# of the neighbours of Y[i] in X
indices = tree2.query_ball_tree(tree1, r=4.2)
如果你不关心指数,只想要球中的点数,使用 count_neighbours
:
n_neighbors = tree2.count_neighbors(tree1, r=4.2)