如何使用 numpy 改进我的自定义函数矢量化

How can I improve my custom function vectorization using numpy

我是 python 的新手,对矢量化更是新手。我试图向量化一个自定义相似度函数,该函数应该 return 输入数组中每一行之间的成对相似度矩阵。

进口:

import numpy as np
from itertools import product
from numpy.lib.stride_tricks import sliding_window_view

输入:

np.random.seed(11)

a = np.array([0, 0, 0, 0, 0, 10, 0, 0, 0, 50, 0, 0, 5, 0, 0, 10])
b = np.array([0, 0, 5, 0, 0, 10, 0, 0, 0, 50, 0, 0, 10, 0, 0, 5])
c = np.array([0, 0, 5, 1, 0, 20, 0, 0, 0, 30, 0, 1, 10, 0, 0, 5])

m = np.array((a,b,c))

输出:

custom_func(m)

array([[   0,  440, 1903],
       [ 440,    0, 1603],
       [1903, 1603,    0]])

功能:

def custom_func(arr):
    diffs = 0
    max_k = 6
    
    for n in range(1, max_k):

        arr1 = np.array([np.sum(i, axis = 1) for i in sliding_window_view(arr, window_shape = n, axis = 1)])
    
        # this function uses np.maximum and np.minimum to subtract the max and min elements (element-wise) between two rows and then sum up the entire of that subtraction
        diffs += np.sum((np.array([np.maximum(arr1[i[0]], arr1[i[1]]) for i in product(np.arange(len(arr1)), np.arange(len(arr1)))]) - np.array([np.minimum(arr1[i[0]], arr1[i[1]]) for i in product(np.arange(len(arr1)), np.arange(len(arr1)))])), axis = 1) * n
    
    diffs = diffs.reshape(len(arr), -1)
    
    return diffs

函数非常简单,它总结了N滑动windows中行的最大值和最小值之间的元素差异。这个函数比我今天发现矢量化之前使用的函数快得多(for loops and pandas dataframes yay)。

我的第一个想法是想出一种方法来一次找到我的数组的最小值和最大值,因为我目前认为它必须执行两次,但我无法弄清楚如何。在我当前的函数中还有一个 for 循环,因为我需要为多个 N 滑动执行此操作 windows,我不确定如何在没有循环的情况下执行此操作。

感谢任何帮助!

您可以从删除第一个列表理解开始:

arr1 = sliding_window_view(arr, window_shape = n, axis = 1).sum(axis=2)

我不会碰那么长的 diffs 行:(

以下是您可以对代码应用的几种优化:

  • 使用 Numba 的 JIT 来加速计算并用嵌套循环替换 product 调用
  • 使用更高效的滑动window算法(更好的复杂度)
  • 避免在循环productarrange中计算多次
  • 减少隐式临时数组分配的数量(和数组 Numpy 调用)
  • 不要计算 diffs 的下三角部分,因为它总是 对称的
    (只复制上三角部分)
  • 使用基于整数的索引而不是慢速浮点索引

这是生成的代码:

import numpy as np
from itertools import product
from numpy.lib.stride_tricks import sliding_window_view
import numba as nb

@nb.njit
def custom_func_fast(arr):
    h, w = arr.shape[0], arr.shape[1]
    diffs = np.zeros((h, h), dtype=arr.dtype)
    max_k = 6

    for n in range(1, max_k):
        arr1 = np.empty(shape=(h, w-n+1), dtype=arr.dtype)

        for i in range(h):
            # Efficient sliding window algorithm
            assert w >= n
            s = np.sum(arr[i, 0:n])
            arr1[i, 0] = s
            for j in range(n, w):
                s -= arr[i, j-n]
                s += arr[i, j]
                arr1[i, j-n+1] = s

        # Efficient distance matrix computation
        for i in range(h):
            for j in range(i+1, h):
                s = 0
                for k in range(w-n+1):
                    s += np.abs(arr1[i,k] - arr1[j,k])
                diffs[i, j] += s * n

    # Fill the lower triangular part
    for i in range(h):
        for j in range(i):
            diffs[i, j] = diffs[j, i]

    return diffs

生成的代码在我的机器上的示例输入数组上快 290 倍