优化 numpy python 函数以获得正交距离
optimize numpy python function to get orthogonal distance
我有3个数组(x_array、y_array、p_array),前两个对应于坐标点为随机点的二维数组,第三个是一个展平数组线对应的点。
我需要计算每个 x_array、y_array 点到由 p_array 点形成的直线的最小正交距离。
为此,我编写了两个主要使用 numpy
的函数。第一个函数创建一个布尔掩码来获取哪些点与线具有正交投影,这将生成一个具有 True 和 False 值的二维数组。第二个函数计算到直线的正交距离或与点的交点的正交投影。
此过程有效,但它是 GUI 的一部分,其中此过程对时间敏感。
我想加快此功能,但我对 numpy
的一般编程知识不足以改进此功能。我已经使用 numba
编写了这个,但是第一个函数的时间从 1.68 秒减少到 1.09。
目标
我的目标是将两个函数的时间都减少到 1 秒以下,我不知道这是否可能。
我将不胜感激
结果
mask: 1.6823785305023193
mask numba: 1.0962464809417725
orto distance: 1.630366563796997
代码
数据:
https://drive.google.com/file/d/1GHDSH1ycQSiBJk8RvkTNiE_RoetwCZw6/view?usp=sharing
https://drive.google.com/file/d/1k8wCop1fePEh7ANfkScQ_BHLGRVo-aJe/view?usp=sharing
https://drive.google.com/file/d/1swhg1BXBh18xyuueOKUNNx3SVGW8Jfq9/view?usp=sharing
import numpy as np
import time
def ortoSegmentPoint_all(p_array, x_array, y_array):
"""
:param p_array: np.array([[ 718898.941 9677612.901 ], [ 718888.8227 9677718.305 ], [ 719033.0528 9677770.692 ]])
:param y_array: np.array([9677656.39934991 9677720.27550726 9677754.79])
:param x_array: np.array([718895.88881594 718938.61392781 718961.46])
:return: [POINT, LINE] indexes where point is orthogonal to line segment
"""
# PENDIENTE "m" de la recta, y = mx + n
m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
# PENDIENTE INVERTIDA, 1/m
inv_m_array = np.divide(1, m_array)
# VALOR "n", y = mx + n
n_array = y_array[:, 0] - x_array[:, 0] * m_array
# VALOR "n_orto" PARA LA RECTA PERPENDICULAR
_len_p_array = len(p_array)
n_orto_array = np.reshape(p_array[:, 1], (_len_p_array, 1)) + inv_m_array * np.reshape(p_array[:, 0], (_len_p_array, 1))
# PUNTOS DONDE SE INTERSECTAN DE FORMA PERPENDICULAR
x_intersec_array = np.divide(n_orto_array - n_array, m_array + inv_m_array)
y_intersec_array = m_array * x_intersec_array + n_array
# LISTAR COORDENADAS EN PARES
# FILAS: NUMERO DE PUNTOS, COLUMNAS: NUMERO DE TRAMOS
maskX = np.where(np.logical_and(x_intersec_array < np.max(x_array, axis=1), x_intersec_array > np.min(x_array, axis=1)), True, False)
maskY = np.where(np.logical_and(y_intersec_array < np.max(y_array, axis=1), y_intersec_array > np.min(y_array, axis=1)), True, False)
mask = maskY * maskX
return mask
def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
' a=(yA−yB), b=(xB−xA) and c=xAyB−xByA '
a, b = np.array([x_array[:, 0], y_array[:, 0]]), np.array([x_array[:, 1], y_array[:, 1]])
a0, b0, c0 = a[1, :] - b[1, :], b[0, :] - a[0, :], a[0, :] * b[1, :] - b[0, :] * a[1, :]
index_point = np.unique(index_point)
_len = len(p_array[index_point])
_x = p_array[index_point][:, 0].reshape(_len, 1)
_y = p_array[index_point][:, 1].reshape(_len, 1)
_a0, _b0, _c0 = np.full(shape=(_len, a0.size), fill_value=a0), np.full(shape=(_len, b0.size), fill_value=b0), np.full(shape=(_len, c0.size), fill_value=c0)
_d0 = np.abs(_a0 * _x + _b0 * _y + _c0)
_d1 = np.sqrt((_a0 * _a0) + (_b0 * _b0))
_d = np.divide(_d0, _d1) * mask[index_point, :]
_d = np.where(_d == 0, 1000000.0, _d)
return _d, index_point
p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')
#CREATE BOOLEAN MASK
t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)
# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)
t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)
Numba 几乎无法加速 Numpy 函数,因为它们已经进行了大部分优化。然而,Numpy 代码的性能可以通过避免 creation/filling of many huge temporary array 来提高。事实上,与现代处理器的处理能力相比,RAM 吞吐量是一种宝贵的稀缺资源(自过去 4 年以来情况越来越糟,所以没有人期望这种情况会很快改变——这被称为 Memory Wall )。解决这个问题的方法很简单,在循环嵌套中一次执行许多 Numpy 操作。在您的情况下,大多数操作都可以在 L1 缓存甚至寄存器中执行。处理器可以以几个数量级的方式对它们进行操作。此外,循环可以很容易地 并行化 。我还发现嵌套循环更易于阅读和优化。
这是生成的代码(请注意,由于 Numba 还不完全支持 Numpy,因此修改了一些调用):
import numpy as np
import numba as nb
import time
@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1])', parallel=True)
def ortoSegmentPoint_all(p_array, x_array, y_array):
n = p_array.shape[0]
m = x_array.shape[0]
m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
inv_m_array = np.divide(1, m_array)
n_array = y_array[:, 0] - x_array[:, 0] * m_array
mask = np.empty((n, m), dtype=np.bool_)
x_min, x_max = np.minimum(x_array[:,0], x_array[:,1]), np.maximum(x_array[:,0], x_array[:,1])
y_min, y_max = np.minimum(y_array[:,0], y_array[:,1]), np.maximum(y_array[:,0], y_array[:,1])
for i in nb.prange(n):
for j in range(m):
n_orto = p_array[i, 1] + inv_m_array[j] * p_array[i, 0]
x_intersec = (n_orto - n_array[j]) / (m_array[j] + inv_m_array[j])
y_intersec = m_array[j] * x_intersec + n_array[j]
maskX = x_min[j] < x_intersec < x_max[j]
maskY = y_min[j] < y_intersec < y_max[j]
mask[i, j] = maskX & maskY
return mask
@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1], int64[::1], bool_[:,::1])', parallel=True)
def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
a = np.vstack((x_array[:, 0], y_array[:, 0]))
b = np.vstack((x_array[:, 1], y_array[:, 1]))
a0 = a[1, :] - b[1, :]
b0 = b[0, :] - a[0, :]
c0 = a[0, :] * b[1, :] - b[0, :] * a[1, :]
index_point = np.unique(index_point)
n = index_point.size
m = x_array.shape[0]
_d = np.empty((n, m), np.float64)
for i in nb.prange(n):
for j in range(m):
point_id = index_point[i]
_x, _y = p_array[point_id]
_d0 = np.abs(a0[j] * _x + b0[j] * _y + c0[j])
_d1 = np.sqrt(a0[j]**2 + b0[j]**2)
tmp = (_d0 / _d1) * mask[point_id, j]
if tmp == 0:
tmp = 1000000.0
_d[i, j] = tmp
return _d, index_point
p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')
#CREATE BOOLEAN MASK
t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)
# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)
rows = rows.copy()
t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)
以下是在我的 6 核机器上优化前后的输出:
Before:
mask: 1.323328
orto distance: 0.544939
After:
mask: 0.040068
orto distance: 0.025106
请注意,代码可以进一步优化。例如,您可以使用 fastmath
和 error_model
等 Numba 标志。您还可以 pre-compute m_array + inv_m_array
的逆除法。最后,您可以使用一些更先进的 micro-optimisations(例如寄存器平铺和循环拆分以改进矢量化),这也会使您的代码更复杂,这应该足够了。
我有3个数组(x_array、y_array、p_array),前两个对应于坐标点为随机点的二维数组,第三个是一个展平数组线对应的点。
我需要计算每个 x_array、y_array 点到由 p_array 点形成的直线的最小正交距离。
为此,我编写了两个主要使用 numpy
的函数。第一个函数创建一个布尔掩码来获取哪些点与线具有正交投影,这将生成一个具有 True 和 False 值的二维数组。第二个函数计算到直线的正交距离或与点的交点的正交投影。
此过程有效,但它是 GUI 的一部分,其中此过程对时间敏感。
我想加快此功能,但我对 numpy
的一般编程知识不足以改进此功能。我已经使用 numba
编写了这个,但是第一个函数的时间从 1.68 秒减少到 1.09。
目标
我的目标是将两个函数的时间都减少到 1 秒以下,我不知道这是否可能。 我将不胜感激
结果
mask: 1.6823785305023193
mask numba: 1.0962464809417725
orto distance: 1.630366563796997
代码
数据:
https://drive.google.com/file/d/1GHDSH1ycQSiBJk8RvkTNiE_RoetwCZw6/view?usp=sharing https://drive.google.com/file/d/1k8wCop1fePEh7ANfkScQ_BHLGRVo-aJe/view?usp=sharing https://drive.google.com/file/d/1swhg1BXBh18xyuueOKUNNx3SVGW8Jfq9/view?usp=sharing
import numpy as np
import time
def ortoSegmentPoint_all(p_array, x_array, y_array):
"""
:param p_array: np.array([[ 718898.941 9677612.901 ], [ 718888.8227 9677718.305 ], [ 719033.0528 9677770.692 ]])
:param y_array: np.array([9677656.39934991 9677720.27550726 9677754.79])
:param x_array: np.array([718895.88881594 718938.61392781 718961.46])
:return: [POINT, LINE] indexes where point is orthogonal to line segment
"""
# PENDIENTE "m" de la recta, y = mx + n
m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
# PENDIENTE INVERTIDA, 1/m
inv_m_array = np.divide(1, m_array)
# VALOR "n", y = mx + n
n_array = y_array[:, 0] - x_array[:, 0] * m_array
# VALOR "n_orto" PARA LA RECTA PERPENDICULAR
_len_p_array = len(p_array)
n_orto_array = np.reshape(p_array[:, 1], (_len_p_array, 1)) + inv_m_array * np.reshape(p_array[:, 0], (_len_p_array, 1))
# PUNTOS DONDE SE INTERSECTAN DE FORMA PERPENDICULAR
x_intersec_array = np.divide(n_orto_array - n_array, m_array + inv_m_array)
y_intersec_array = m_array * x_intersec_array + n_array
# LISTAR COORDENADAS EN PARES
# FILAS: NUMERO DE PUNTOS, COLUMNAS: NUMERO DE TRAMOS
maskX = np.where(np.logical_and(x_intersec_array < np.max(x_array, axis=1), x_intersec_array > np.min(x_array, axis=1)), True, False)
maskY = np.where(np.logical_and(y_intersec_array < np.max(y_array, axis=1), y_intersec_array > np.min(y_array, axis=1)), True, False)
mask = maskY * maskX
return mask
def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
' a=(yA−yB), b=(xB−xA) and c=xAyB−xByA '
a, b = np.array([x_array[:, 0], y_array[:, 0]]), np.array([x_array[:, 1], y_array[:, 1]])
a0, b0, c0 = a[1, :] - b[1, :], b[0, :] - a[0, :], a[0, :] * b[1, :] - b[0, :] * a[1, :]
index_point = np.unique(index_point)
_len = len(p_array[index_point])
_x = p_array[index_point][:, 0].reshape(_len, 1)
_y = p_array[index_point][:, 1].reshape(_len, 1)
_a0, _b0, _c0 = np.full(shape=(_len, a0.size), fill_value=a0), np.full(shape=(_len, b0.size), fill_value=b0), np.full(shape=(_len, c0.size), fill_value=c0)
_d0 = np.abs(_a0 * _x + _b0 * _y + _c0)
_d1 = np.sqrt((_a0 * _a0) + (_b0 * _b0))
_d = np.divide(_d0, _d1) * mask[index_point, :]
_d = np.where(_d == 0, 1000000.0, _d)
return _d, index_point
p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')
#CREATE BOOLEAN MASK
t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)
# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)
t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)
Numba 几乎无法加速 Numpy 函数,因为它们已经进行了大部分优化。然而,Numpy 代码的性能可以通过避免 creation/filling of many huge temporary array 来提高。事实上,与现代处理器的处理能力相比,RAM 吞吐量是一种宝贵的稀缺资源(自过去 4 年以来情况越来越糟,所以没有人期望这种情况会很快改变——这被称为 Memory Wall )。解决这个问题的方法很简单,在循环嵌套中一次执行许多 Numpy 操作。在您的情况下,大多数操作都可以在 L1 缓存甚至寄存器中执行。处理器可以以几个数量级的方式对它们进行操作。此外,循环可以很容易地 并行化 。我还发现嵌套循环更易于阅读和优化。
这是生成的代码(请注意,由于 Numba 还不完全支持 Numpy,因此修改了一些调用):
import numpy as np
import numba as nb
import time
@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1])', parallel=True)
def ortoSegmentPoint_all(p_array, x_array, y_array):
n = p_array.shape[0]
m = x_array.shape[0]
m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
inv_m_array = np.divide(1, m_array)
n_array = y_array[:, 0] - x_array[:, 0] * m_array
mask = np.empty((n, m), dtype=np.bool_)
x_min, x_max = np.minimum(x_array[:,0], x_array[:,1]), np.maximum(x_array[:,0], x_array[:,1])
y_min, y_max = np.minimum(y_array[:,0], y_array[:,1]), np.maximum(y_array[:,0], y_array[:,1])
for i in nb.prange(n):
for j in range(m):
n_orto = p_array[i, 1] + inv_m_array[j] * p_array[i, 0]
x_intersec = (n_orto - n_array[j]) / (m_array[j] + inv_m_array[j])
y_intersec = m_array[j] * x_intersec + n_array[j]
maskX = x_min[j] < x_intersec < x_max[j]
maskY = y_min[j] < y_intersec < y_max[j]
mask[i, j] = maskX & maskY
return mask
@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1], int64[::1], bool_[:,::1])', parallel=True)
def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
a = np.vstack((x_array[:, 0], y_array[:, 0]))
b = np.vstack((x_array[:, 1], y_array[:, 1]))
a0 = a[1, :] - b[1, :]
b0 = b[0, :] - a[0, :]
c0 = a[0, :] * b[1, :] - b[0, :] * a[1, :]
index_point = np.unique(index_point)
n = index_point.size
m = x_array.shape[0]
_d = np.empty((n, m), np.float64)
for i in nb.prange(n):
for j in range(m):
point_id = index_point[i]
_x, _y = p_array[point_id]
_d0 = np.abs(a0[j] * _x + b0[j] * _y + c0[j])
_d1 = np.sqrt(a0[j]**2 + b0[j]**2)
tmp = (_d0 / _d1) * mask[point_id, j]
if tmp == 0:
tmp = 1000000.0
_d[i, j] = tmp
return _d, index_point
p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')
#CREATE BOOLEAN MASK
t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)
# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)
rows = rows.copy()
t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)
以下是在我的 6 核机器上优化前后的输出:
Before:
mask: 1.323328
orto distance: 0.544939
After:
mask: 0.040068
orto distance: 0.025106
请注意,代码可以进一步优化。例如,您可以使用 fastmath
和 error_model
等 Numba 标志。您还可以 pre-compute m_array + inv_m_array
的逆除法。最后,您可以使用一些更先进的 micro-optimisations(例如寄存器平铺和循环拆分以改进矢量化),这也会使您的代码更复杂,这应该足够了。