python 中两个数组的高效逐元素匹配

Efficient element-wise matching of two arrays in python

我正在处理匹配问题。我有两个大小相同的数组 AB(假设 1000x1000)。对于 A 中的每个元素 a_ijij 分别是行索引和列索引),我需要找到最接近的元素 b_i? i-thB。目前,我想出了三个解决方案:

  1. 嵌套循环(耗时 5.47 秒)
  2. 带广播的单循环(需要 6.40 秒)
  3. 并行循环(耗时30.27s)

我认为以上三种方法都不够省时,或者至少我的实现不够好(第三种方法花费的时间比我预期的要长!)。如果您能指出我如何改进我的代码,那就太好了。提前致谢!

我目前的解决方案如下:

import numpy as np
import time
from joblib import Parallel, delayed

def method1(A,B): # Nested loop (as a Benchmark)
    output = np.zeros(A.shape)
    for r in range(A.shape[0]):
        rowB = B[r]
        for c in range(A.shape[1]):
            elementA = A[r,c]
            diff_vector = np.abs(elementA - rowB)
            output[r,c] = np.argmin(diff_vector)
    return output

def method2(A,B): # Broadcasting
    output = np.zeros(A.shape)
    for r in range(A.shape[0]):
        diff_matrix = np.abs(A[r][:, np.newaxis] - B[r])
        output[r] = np.argmin(diff_matrix, axis=1)
    return output

def method3(A,B): # Parallel for loop
    def matcher(r, A, B):
        i = r//A.shape[1]
        j = r % A.shape[1]

        elementA = A[i, j]
        rowB = B[i]
        diff_vector = np.abs(elementA - rowB)
        return np.argmin(diff_vector)

    output = Parallel(n_jobs=4)(delayed(matcher)(r, A, B) for r in range(A.shape[0]*A.shape[1]))
    output = np.reshape(output, [A.shape[0], A.shape[1]])
    return output

A = np.random.randn(1000,1000)
B = np.random.randn(1000,1000)
output1 = method1(A,B)
output2 = method2(A,B)
output3 = method3(A,B)

该算法显然效率低下(所有方法):检查rowB的所有项目以找到最接近的一项是昂贵的并导致数十亿次浮点运算。更不用说 Numpy 为每次调用 elementA - rowBnp.abs.

创建一个昂贵的不必要的 临时数组

您可以通过执行 排序然后进行二进制搜索 来加快此过程。这种方法的时间复杂度为 O(n**2 log n),而初始方法的时间复杂度为 O(n**3)。您还可以使用 Numba 轻松编写此算法的 parallel 实现。此外,您不需要将输出存储在 float64 类型的数组中:基于整数的数组就足够了(更快,可能更小)。

这是最终的实现:

import numba as nb
import numpy as np

@nb.njit('int_[:,::1](float64[:,::1],float64[:,::1])', parallel=True)
def method4(A,B):
    mB = B.shape[1]
    output = np.empty(A.shape, dtype=np.int_)

    # Parallel loop
    for r in nb.prange(A.shape[0]):
        rowA = A[r]
        rowB = B[r]

        # Sort a row
        index_rowB = np.argsort(rowB)
        sorted_rowB = rowB[index_rowB]

        # Fast binary search in the sorted row
        # See: 
        idxs = np.searchsorted(sorted_rowB, rowA)
        left = np.fabs(rowA - sorted_rowB[np.maximum(idxs-1, 0)])
        right = np.fabs(rowA - sorted_rowB[np.minimum(idxs, mB-1)])
        prev_idx_is_less = (idxs == mB) | (left < right)

        # Find the position in the original unsorted array
        output[r] = index_rowB[idxs - prev_idx_is_less]

    return output

output4 = method4(A,B)

这比我的 10 核机器上最快的初始实现(method1)快 482 倍

method1:  4341 ms
method2:  5839 ms
method4:     9 ms