加快两组路径的所有组合之间的距离计算
Speeding up distance calculation between all combinations of two sets of paths
我有两个代理,每个代理都有由 x
和 y
坐标定义的 100 个时间步长的路径。换句话说,单个路径的形状为 (100,2)
。现在代理 A
和 B
都有 1000 个唯一路径。我想计算 所有 路径组合之间的距离 每个时间步 。换句话说,我的最终输出应该具有 (1000,1000,100)
的形状。目前我使用以下方法:
import numpy as np
import time
np.random.seed(0)
N = 1000
A = np.random.rand(N,100,2)
B = np.random.rand(N,100,2)
t0 = time.time()
combinations = np.array(np.meshgrid(np.arange(N), np.arange(N)))
combinations = combinations.T.reshape(-1,2)
# Calculate distances
diff = A[combinations[:,0],:] - B[combinations[:,1],:] # Differences
distances = np.sqrt(np.einsum('ijk,ijk->ij',diff,diff)) # A bit faster than linalg norm
distances = np.reshape(distances, (N,N,100))
print('Time:', time.time() - t0)
但是,我不得不说这个方法相当慢(在我的机器上大约需要 1.2 秒)。有更快的方法吗?
所提供的 Numpy 实现的问题是它创建了巨大的临时缓冲区,填充起来成本很高(主要是因为它们存储在 RAM 并在运行中分配导致缓慢 页面错误 )。结果,代码完全受内存限制。
您可以使用 Numba 使用普通循环而不使用临时缓冲区(感谢 JIT 编译器)更有效地完成此操作。此外,Numba 可帮助您轻松地并行计算 。这是一个实现:
import numpy as np
import numba as nb
import time
# Calculate distances and put it in `distances`
@nb.njit('void(float64[:,:,::1], float64[:,:,::1], float64[:,:,::1])', parallel=True)
def computeDistance(A, B, distances):
n = A.shape[0]
assert A.shape == (n,100,2)
assert A.shape == B.shape
assert distances.shape == (n,n,100)
for i in nb.prange(n):
for j in range(n):
for k in range(100):
distances[i,j,k] = np.sqrt((A[i,k,0] - B[j,k,0])**2 + (A[i,k,1] - B[j,k,1])**2)
np.random.seed(0)
N = 1000
A = np.random.rand(N,100,2)
B = np.random.rand(N,100,2)
distances = np.full((N,N,100), 1.0) # Pre-allocation
t0 = time.time()
computeDistance(A, B, distances)
print('Time:', time.time() - t0)
循环被编译为非常高效的并行实现(使用SIMD 指令)。 distances
是 预分配的 以在迭代循环中获得更好的性能。在我的 6 核机器上,此代码快 32 倍(而且它仍然主要受内存限制)。如果您想获得更好的性能,您可以将距离存储在 32 位浮点数中(最多快 2 倍)。
我有两个代理,每个代理都有由 x
和 y
坐标定义的 100 个时间步长的路径。换句话说,单个路径的形状为 (100,2)
。现在代理 A
和 B
都有 1000 个唯一路径。我想计算 所有 路径组合之间的距离 每个时间步 。换句话说,我的最终输出应该具有 (1000,1000,100)
的形状。目前我使用以下方法:
import numpy as np
import time
np.random.seed(0)
N = 1000
A = np.random.rand(N,100,2)
B = np.random.rand(N,100,2)
t0 = time.time()
combinations = np.array(np.meshgrid(np.arange(N), np.arange(N)))
combinations = combinations.T.reshape(-1,2)
# Calculate distances
diff = A[combinations[:,0],:] - B[combinations[:,1],:] # Differences
distances = np.sqrt(np.einsum('ijk,ijk->ij',diff,diff)) # A bit faster than linalg norm
distances = np.reshape(distances, (N,N,100))
print('Time:', time.time() - t0)
但是,我不得不说这个方法相当慢(在我的机器上大约需要 1.2 秒)。有更快的方法吗?
所提供的 Numpy 实现的问题是它创建了巨大的临时缓冲区,填充起来成本很高(主要是因为它们存储在 RAM 并在运行中分配导致缓慢 页面错误 )。结果,代码完全受内存限制。
您可以使用 Numba 使用普通循环而不使用临时缓冲区(感谢 JIT 编译器)更有效地完成此操作。此外,Numba 可帮助您轻松地并行计算 。这是一个实现:
import numpy as np
import numba as nb
import time
# Calculate distances and put it in `distances`
@nb.njit('void(float64[:,:,::1], float64[:,:,::1], float64[:,:,::1])', parallel=True)
def computeDistance(A, B, distances):
n = A.shape[0]
assert A.shape == (n,100,2)
assert A.shape == B.shape
assert distances.shape == (n,n,100)
for i in nb.prange(n):
for j in range(n):
for k in range(100):
distances[i,j,k] = np.sqrt((A[i,k,0] - B[j,k,0])**2 + (A[i,k,1] - B[j,k,1])**2)
np.random.seed(0)
N = 1000
A = np.random.rand(N,100,2)
B = np.random.rand(N,100,2)
distances = np.full((N,N,100), 1.0) # Pre-allocation
t0 = time.time()
computeDistance(A, B, distances)
print('Time:', time.time() - t0)
循环被编译为非常高效的并行实现(使用SIMD 指令)。 distances
是 预分配的 以在迭代循环中获得更好的性能。在我的 6 核机器上,此代码快 32 倍(而且它仍然主要受内存限制)。如果您想获得更好的性能,您可以将距离存储在 32 位浮点数中(最多快 2 倍)。