计算两个numpy数组的向量之间的距离
Calculating distances between vectors of two numpy arrays
我有两个尺寸为 S x F 的 numpy 数组 R 和尺寸为 W N x M x F。具体让我们分配以下值 N = 5
、M = 7
、F = 3
、S = 4
数组 R 包含一组具有 F = 3
特征的样本 S = 4
。每行代表一个样本,每行代表一个特征。因此 R[0]
是第一个样本,R[1]
第二个样本,然后继续。每个 R[i-th]
条目包含 F
个元素,例如 R[0] = np.array([1, 4, -2])
.
这是一个初始化所有这些值的小片段,考虑到 MWE
import numpy as np
# Size of Map (rows, columns)
N, M = 5, 7
# Number of features
F = 3
# Sample size
S = 4
np.random.seed(13)
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
我们还可以看到给定的 "depth line" 的 numpy 数组 W,作为一个向量,也与每个向量具有相同的维度数组行 R(查看两个数组最后一维的大小很容易注意到这一点)。这样我就可以访问 W[2, 3]
并获得 np.array([ 2, 2, -1 ])
(这里的值只是示例)。
我创建了一个简单的函数来计算给定向量 r 到矩阵 "depth line" 的每个 "depth line" 的距离56=]W和return最近元素的位置W深度线到r
def nearest_vector_matrix_naive(r, W):
delta = np.zeros((N,M), dtype=int)
for i in range(N):
for j in range(M):
norm = 0
for k in range(F):
norm += (r[k] - W[i,j,k])**2
delta[i,j] = norm
norm = 0
win_idx = np.unravel_index(np.argmin(delta, axis=None), delta.shape)
return win_idx
当然这是一种非常幼稚的方法,我可以进一步优化下面的代码,从而获得巨大的性能提升。
def nearest_vector_matrix(r, W):
delta = np.sum((W[:,:] - r)**2, axis=2)
return np.unravel_index(np.argmin(delta, axis=None), delta.shape)
我可以像
一样简单地使用这个功能
nearest_idx = nearest_vector_matrix(R[0], W)
# Returns the nearest vector in W to R[0]
W[nearest_idx]
因为我有数组 R 和一堆样本,所以我使用以下代码片段来计算最接近样本数组的向量:
def nearest_samples_matrix(R, W):
DELTA = np.zeros((R.shape[0],2))
for idx, r in enumerate(R):
delta = np.sum((W[:,:] - r)**2, axis=2)
DELTA[idx] = np.unravel_index(np.argmin(delta, axis=None), delta.shape)
return DELTA
此函数returns 是一个包含S 行(S 是样本数)二维索引的数组。那就是 DELTA 具有 (S, 2)
形状(总是)。
我想知道如何在 nearest_samples_matrix
中替换 for
循环(例如广播)以进一步提高代码执行性能?
我不知道该怎么做。 (除了我在第一种情况下能够做到)
最佳解决方案取决于数组的输入大小
对于低维问题dim<20或者更小,a kdtree approach is usually the way to go. There are quite a lot of answers regarding this topic eg. 我几周前写过
如果问题的维度太高,您可以切换到蛮力算法。以下两种算法都比您的优化方法快得多,但在更大的输入大小和低维问题上比 kdtree 方法慢得多 O(log(n)) 而不是 O(n^2)。
暴力破解1
以下示例使用 here 中描述的算法。它在大维问题上非常快,因为大部分计算是在高度优化的矩阵乘法算法中完成的。
缺点是内存使用率高(所有距离都在一个函数调用中计算)和精度问题,因为计算方法更容易出错。
import numpy as np
from sklearn.metrics.pairwise import euclidean_distances
def nearest_samples_matrix_2(R,W):
R_Temp=R
W_Temp=W.reshape(-1,W.shape[2])
dist=euclidean_distances(R_Temp, W_Temp)
ind_1,ind_2=np.unravel_index(np.argmin(dist,axis=1),shape=(W.shape[0],W.shape[1]))
return np.vstack((ind_1,ind_2)).T
暴力破解2
这与您天真的方法非常相似,但使用 JIT 编译器 (Numba) 以获得良好的性能。临时数组不是必需的,精度应该很好(只要不发生溢出)。对于更大的输入尺寸,还有进一步优化(循环平铺)的空间。
import numpy as np
import numba as nb
#parallelization is only beneficial on larger input data
@nb.njit(fastmath=True,parallel=True,cache=True)
def nearest_samples_matrix_3(r, W):
ind_i=0
ind_j=0
out=np.empty((r.shape[0],2),dtype=np.int64)
for x in nb.prange(r.shape[0]):
delta=0
for k in range(W.shape[2]):
delta += (r[x,k] - W[0,0,k])**2
for i in range(W.shape[0]):
for j in range(W.shape[1]):
norm = 0
for k in range(W.shape[2]):
norm += (r[x,k] - W[i,j,k])**2
if norm < delta:
delta=norm
ind_i=i
ind_j=j
out[x,0]=ind_i
out[x,1]=ind_j
return out
计时
#small Arrays
N, M = 100, 200
F = 30
S = 50
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
#your function
%timeit nearest_samples_matrix(R,W)
#268 ms ± 2.94 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit nearest_samples_matrix_2(R,W)
#5.62 ms ± 22.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit nearest_samples_matrix_3(R,W)
#3.68 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#larger arrays
N, M = 1_000, 2_000
F = 50
S = 100
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
#%timeit nearest_samples_matrix_1(R,W)
#too slow
%timeit nearest_samples_matrix_2(R,W)
#2.76 s ± 17.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit nearest_samples_matrix_3(R,W)
#1.42 s ± 402 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
我有两个尺寸为 S x F 的 numpy 数组 R 和尺寸为 W N x M x F。具体让我们分配以下值 N = 5
、M = 7
、F = 3
、S = 4
数组 R 包含一组具有 F = 3
特征的样本 S = 4
。每行代表一个样本,每行代表一个特征。因此 R[0]
是第一个样本,R[1]
第二个样本,然后继续。每个 R[i-th]
条目包含 F
个元素,例如 R[0] = np.array([1, 4, -2])
.
这是一个初始化所有这些值的小片段,考虑到 MWE
import numpy as np
# Size of Map (rows, columns)
N, M = 5, 7
# Number of features
F = 3
# Sample size
S = 4
np.random.seed(13)
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
我们还可以看到给定的 "depth line" 的 numpy 数组 W,作为一个向量,也与每个向量具有相同的维度数组行 R(查看两个数组最后一维的大小很容易注意到这一点)。这样我就可以访问 W[2, 3]
并获得 np.array([ 2, 2, -1 ])
(这里的值只是示例)。
我创建了一个简单的函数来计算给定向量 r 到矩阵 "depth line" 的每个 "depth line" 的距离56=]W和return最近元素的位置W深度线到r
def nearest_vector_matrix_naive(r, W):
delta = np.zeros((N,M), dtype=int)
for i in range(N):
for j in range(M):
norm = 0
for k in range(F):
norm += (r[k] - W[i,j,k])**2
delta[i,j] = norm
norm = 0
win_idx = np.unravel_index(np.argmin(delta, axis=None), delta.shape)
return win_idx
当然这是一种非常幼稚的方法,我可以进一步优化下面的代码,从而获得巨大的性能提升。
def nearest_vector_matrix(r, W):
delta = np.sum((W[:,:] - r)**2, axis=2)
return np.unravel_index(np.argmin(delta, axis=None), delta.shape)
我可以像
一样简单地使用这个功能nearest_idx = nearest_vector_matrix(R[0], W)
# Returns the nearest vector in W to R[0]
W[nearest_idx]
因为我有数组 R 和一堆样本,所以我使用以下代码片段来计算最接近样本数组的向量:
def nearest_samples_matrix(R, W):
DELTA = np.zeros((R.shape[0],2))
for idx, r in enumerate(R):
delta = np.sum((W[:,:] - r)**2, axis=2)
DELTA[idx] = np.unravel_index(np.argmin(delta, axis=None), delta.shape)
return DELTA
此函数returns 是一个包含S 行(S 是样本数)二维索引的数组。那就是 DELTA 具有 (S, 2)
形状(总是)。
我想知道如何在 nearest_samples_matrix
中替换 for
循环(例如广播)以进一步提高代码执行性能?
我不知道该怎么做。 (除了我在第一种情况下能够做到)
最佳解决方案取决于数组的输入大小
对于低维问题dim<20或者更小,a kdtree approach is usually the way to go. There are quite a lot of answers regarding this topic eg.
如果问题的维度太高,您可以切换到蛮力算法。以下两种算法都比您的优化方法快得多,但在更大的输入大小和低维问题上比 kdtree 方法慢得多 O(log(n)) 而不是 O(n^2)。
暴力破解1
以下示例使用 here 中描述的算法。它在大维问题上非常快,因为大部分计算是在高度优化的矩阵乘法算法中完成的。 缺点是内存使用率高(所有距离都在一个函数调用中计算)和精度问题,因为计算方法更容易出错。
import numpy as np
from sklearn.metrics.pairwise import euclidean_distances
def nearest_samples_matrix_2(R,W):
R_Temp=R
W_Temp=W.reshape(-1,W.shape[2])
dist=euclidean_distances(R_Temp, W_Temp)
ind_1,ind_2=np.unravel_index(np.argmin(dist,axis=1),shape=(W.shape[0],W.shape[1]))
return np.vstack((ind_1,ind_2)).T
暴力破解2
这与您天真的方法非常相似,但使用 JIT 编译器 (Numba) 以获得良好的性能。临时数组不是必需的,精度应该很好(只要不发生溢出)。对于更大的输入尺寸,还有进一步优化(循环平铺)的空间。
import numpy as np
import numba as nb
#parallelization is only beneficial on larger input data
@nb.njit(fastmath=True,parallel=True,cache=True)
def nearest_samples_matrix_3(r, W):
ind_i=0
ind_j=0
out=np.empty((r.shape[0],2),dtype=np.int64)
for x in nb.prange(r.shape[0]):
delta=0
for k in range(W.shape[2]):
delta += (r[x,k] - W[0,0,k])**2
for i in range(W.shape[0]):
for j in range(W.shape[1]):
norm = 0
for k in range(W.shape[2]):
norm += (r[x,k] - W[i,j,k])**2
if norm < delta:
delta=norm
ind_i=i
ind_j=j
out[x,0]=ind_i
out[x,1]=ind_j
return out
计时
#small Arrays
N, M = 100, 200
F = 30
S = 50
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
#your function
%timeit nearest_samples_matrix(R,W)
#268 ms ± 2.94 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit nearest_samples_matrix_2(R,W)
#5.62 ms ± 22.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit nearest_samples_matrix_3(R,W)
#3.68 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#larger arrays
N, M = 1_000, 2_000
F = 50
S = 100
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))
#%timeit nearest_samples_matrix_1(R,W)
#too slow
%timeit nearest_samples_matrix_2(R,W)
#2.76 s ± 17.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit nearest_samples_matrix_3(R,W)
#1.42 s ± 402 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)