优化 numpy python 函数以获得正交距离

optimize numpy python function to get orthogonal distance

我有3个数组(x_array、y_array、p_array),前两个对应于坐标点为随机点的二维数组,第三个是一个展平数组线对应的点。

我需要计算每个 x_array、y_array 点到由 p_array 点形成的直线的最小正交距离。

为此,我编写了两个主要使用 numpy 的函数。第一个函数创建一个布尔掩码来获取哪些点与线具有正交投影,这将生成一个具有 True 和 False 值的二维数组。第二个函数计算到直线的正交距离或与点的交点的正交投影。

此过程有效,但它是 GUI 的一部分,其中此过程对时间敏感。 我想加快此功能,但我对 numpy 的一般编程知识不足以改进此功能。我已经使用 numba 编写了这个,但是第一个函数的时间从 1.68 秒减少到 1.09。

目标

我的目标是将两个函数的时间都减少到 1 秒以下,我不知道这是否可能。 我将不胜感激

结果

mask:  1.6823785305023193
mask numba:  1.0962464809417725
orto distance:  1.630366563796997

代码

数据:

https://drive.google.com/file/d/1GHDSH1ycQSiBJk8RvkTNiE_RoetwCZw6/view?usp=sharing https://drive.google.com/file/d/1k8wCop1fePEh7ANfkScQ_BHLGRVo-aJe/view?usp=sharing https://drive.google.com/file/d/1swhg1BXBh18xyuueOKUNNx3SVGW8Jfq9/view?usp=sharing

import  numpy as np
import time



def ortoSegmentPoint_all(p_array, x_array, y_array):
    """
    :param p_array: np.array([[ 718898.941  9677612.901 ], [ 718888.8227 9677718.305 ], [ 719033.0528 9677770.692 ]])
    :param y_array: np.array([9677656.39934991 9677720.27550726 9677754.79])
    :param x_array: np.array([718895.88881594 718938.61392781 718961.46])
    :return: [POINT, LINE] indexes where point is orthogonal to line segment
    """
    # PENDIENTE "m" de la recta, y = mx + n
    m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
    # PENDIENTE INVERTIDA, 1/m
    inv_m_array = np.divide(1, m_array)
    # VALOR "n", y = mx + n
    n_array = y_array[:, 0] - x_array[:, 0] * m_array
    # VALOR "n_orto" PARA LA RECTA PERPENDICULAR
    _len_p_array = len(p_array)
    n_orto_array = np.reshape(p_array[:, 1], (_len_p_array, 1)) + inv_m_array * np.reshape(p_array[:, 0], (_len_p_array, 1))
    # PUNTOS DONDE SE INTERSECTAN DE FORMA PERPENDICULAR
    x_intersec_array = np.divide(n_orto_array - n_array, m_array + inv_m_array)
    y_intersec_array = m_array * x_intersec_array + n_array
    # LISTAR COORDENADAS EN PARES
    # FILAS: NUMERO DE PUNTOS, COLUMNAS: NUMERO DE TRAMOS
    maskX = np.where(np.logical_and(x_intersec_array < np.max(x_array, axis=1), x_intersec_array > np.min(x_array, axis=1)), True, False)
    maskY = np.where(np.logical_and(y_intersec_array < np.max(y_array, axis=1), y_intersec_array > np.min(y_array, axis=1)), True, False)
    mask = maskY * maskX
    return mask

def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
    ' a=(yA−yB), b=(xB−xA) and c=xAyB−xByA '
    a, b = np.array([x_array[:, 0], y_array[:, 0]]), np.array([x_array[:, 1], y_array[:, 1]])
    a0, b0, c0 = a[1, :] - b[1, :], b[0, :] - a[0, :], a[0, :] * b[1, :] - b[0, :] * a[1, :]
    index_point = np.unique(index_point)
    _len = len(p_array[index_point])
    _x = p_array[index_point][:, 0].reshape(_len, 1)
    _y = p_array[index_point][:, 1].reshape(_len, 1)
    _a0, _b0, _c0 = np.full(shape=(_len, a0.size), fill_value=a0), np.full(shape=(_len, b0.size),  fill_value=b0), np.full(shape=(_len, c0.size), fill_value=c0)
    _d0 = np.abs(_a0 * _x + _b0 * _y + _c0)
    _d1 = np.sqrt((_a0 * _a0) + (_b0 * _b0))
    _d = np.divide(_d0, _d1) * mask[index_point, :]
    _d = np.where(_d == 0, 1000000.0, _d)
    return _d, index_point


p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')


#CREATE BOOLEAN MASK

t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)

# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)

t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)

Numba 几乎无法加速 Numpy 函数,因为它们已经进行了大部分优化。然而,Numpy 代码的性能可以通过避免 creation/filling of many huge temporary array 来提高。事实上,与现代处理器的处理能力相比,RAM 吞吐量是一种宝贵的稀缺资源(自过去 4 年以来情况越来越糟,所以没有人期望这种情况会很快改变——这被称为 Memory Wall )。解决这个问题的方法很简单,在循环嵌套中一次执行许多 Numpy 操作。在您的情况下,大多数操作都可以在 L1 缓存甚至寄存器中执行。处理器可以以几个数量级的方式对它们进行操作。此外,循环可以很容易地 并行化 。我还发现嵌套循环更易于阅读和优化。

这是生成的代码(请注意,由于 Numba 还不完全支持 Numpy,因此修改了一些调用):

import  numpy as np
import  numba as nb
import time


@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1])', parallel=True)
def ortoSegmentPoint_all(p_array, x_array, y_array):
    n = p_array.shape[0]
    m = x_array.shape[0]
    m_array = np.divide(y_array[:, 0] - y_array[:, 1], x_array[:, 0] - x_array[:, 1])
    inv_m_array = np.divide(1, m_array)
    n_array = y_array[:, 0] - x_array[:, 0] * m_array
    mask = np.empty((n, m), dtype=np.bool_)
    x_min, x_max = np.minimum(x_array[:,0], x_array[:,1]), np.maximum(x_array[:,0], x_array[:,1])
    y_min, y_max = np.minimum(y_array[:,0], y_array[:,1]), np.maximum(y_array[:,0], y_array[:,1])
    for i in nb.prange(n):
        for j in range(m):
            n_orto = p_array[i, 1] + inv_m_array[j] * p_array[i, 0]
            x_intersec = (n_orto - n_array[j]) / (m_array[j] + inv_m_array[j])
            y_intersec = m_array[j] * x_intersec + n_array[j]
            maskX = x_min[j] < x_intersec < x_max[j]
            maskY = y_min[j] < y_intersec < y_max[j]
            mask[i, j] = maskX & maskY
    return mask

@nb.njit('(float64[:,::1], float64[:,::1], float64[:,::1], int64[::1], bool_[:,::1])', parallel=True)
def ortoDistancePointLine_indx(x_array, y_array, p_array, index_point, mask):
    a = np.vstack((x_array[:, 0], y_array[:, 0]))
    b = np.vstack((x_array[:, 1], y_array[:, 1]))
    a0 = a[1, :] - b[1, :]
    b0 = b[0, :] - a[0, :]
    c0 = a[0, :] * b[1, :] - b[0, :] * a[1, :]
    index_point = np.unique(index_point)
    n = index_point.size
    m = x_array.shape[0]
    _d = np.empty((n, m), np.float64)
    for i in nb.prange(n):
        for j in range(m):
            point_id = index_point[i]
            _x, _y = p_array[point_id]
            _d0 = np.abs(a0[j] * _x + b0[j] * _y + c0[j])
            _d1 = np.sqrt(a0[j]**2 + b0[j]**2)
            tmp = (_d0 / _d1) * mask[point_id, j]
            if tmp == 0:
                tmp = 1000000.0
            _d[i, j] = tmp
    return _d, index_point

p_array = np.load('p_array.npy')
x_array = np.load('x_array.npy')
y_array = np.load('y_array.npy')

#CREATE BOOLEAN MASK
t0 = time.time()
mask = ortoSegmentPoint_all(p_array, x_array, y_array)
print('mask: ', time.time() - t0)

# ORTHOGONAL DISTANCE
rows, cols = np.where(mask)
rows = rows.copy()

t0 = time.time()
dist_orto, idx_orto = ortoDistancePointLine_indx(x_array, y_array, p_array, rows, mask)
print('orto distance: ', time.time() - t0)

以下是在我的 6 核机器上优化前后的输出:

Before:
    mask:           1.323328
    orto distance:  0.544939

After:
    mask:           0.040068
    orto distance:  0.025106

请注意,代码可以进一步优化。例如,您可以使用 fastmatherror_model 等 Numba 标志。您还可以 pre-compute m_array + inv_m_array 的逆除法。最后,您可以使用一些更先进的 micro-optimisations(例如寄存器平铺和循环拆分以改进矢量化),这也会使您的代码更复杂,这应该足够了。