优雅地完成对两个不等长数组的并行操作

Elegant way to complete a parallel operation on two arrays of unequal lengths

我想在 numba 中编写一个函数,对 2 个数组运行数学运算,并在两个数组的元素计数不同时进行调整。

因此,例如:假设我想要一个函数,将数组 a 的每个元素添加到数组 b 的元素,并具有以下 3 种可能的情况:

1) ab 的项目数相同,做 c[ii]=a[ii]+b[ii]

2) a 的项目比 b 多:执行 c[ii]=a[ii]+b[ii] 直到 b 的上限,并完成 c[ii]=a[ii]+b[-1]

3) a 的项目比 b 少:执行 c[ii]=a[ii]+b[ii] 直到 a 的上限,并完成 c[ii]=a[-1]+b[ii]

为此我写了下面的代码,它在处理数百万个值时运行良好并且速度很快,但是我可以清楚地看到三个几乎相同的代码块,感觉非常浪费。加上循环中的if/else运行也感觉很糟糕。

from numba import jit, float64, int32

@jit(float64[:](float64[:], float64[:]), nopython=True)
def add(a, b):

    # Both shapes are equal: add between a[i] and b[i]
    if a.shape[0] == b.shape[0]:
        c = np.empty(a.shape)

        for i in range(a.shape[0]):
            c[i] = a[i] + b[i]

        return c

    # a has more entries than b: add between a[i] and b[i] until b.shape[0]-1 is reached.
    # finish the loop with add between a[i] and b[-1]
    elif a.shape[0] > b.shape[0]:
        c = np.empty(a.shape)
        i_ = b.shape[0]-1 # upper limit of b's shape

        for i in range(a.shape[0]):
            if i < b.shape[0]:
                c[i] = a[i] + b[i]
            else:
                c[i] = a[i] + b[i_]

        return c

    # b has more entries than a: add between a[i] and b[i] until a.shape[0]-1 is reached.
    # finish the loop with add between a[-1] and b[i]    
    else:
        c = np.empty(b.shape)
        i_ = a.shape[0]-1 # upper limit of a's shape

        for i in range(b.shape[0]):
            if i < a.shape[0]:
                c[i] = a[i] + b[i]
            else:
                c[i] = a[i_] + b[i]

        return c

我是 numbajit python 代码编译的新手,所以这可能只是 "the most efficient way" 做我想做的。

但是,如果有更优雅的方式在不牺牲速度的情况下做到这一点,我很想知道如何做。

but I can clearly see three nearly identical code blocks which feel terribly wasteful.

是的,您在代码中重复了很多自己的话。另一方面,很容易看出每个案例的作用。

您可以简单地使用两个循环来代替:

import numba as nb

@nb.njit(nb.float64[:](nb.float64[:], nb.float64[:]))
def add2(a, b):
    size1, size2 = a.shape[0], b.shape[0]
    maxsize, minsize = max(size1, size2), min(size1, size2)
    c = np.empty(maxsize)

    # Calculate the elements which are present in a and b
    for idx in range(minsize):
        c[idx] = a[idx] + b[idx]

    # Check which array is longer and which fillvalue should be applied
    if size1 > size2:
        missing = a
        filler = b[-1]
    else:
        missing = b
        filler = a[-1]

    # Calculate the elements after a or b ended. In case they have equal lengths
    # the range is of length 0 so it won't enter.
    for idx in range(minsize, maxsize):
        c[idx] = missing[idx] + filler

    return c

重复少了很多,但可能不那么清楚。

Plus the if/else running in a loop also feels terrible.

实际上它并不像看起来那么糟糕,因为分支预测使得这个 if 非常便宜。只要两个数组仍然有元素,它就会是 True,并且只有在一个数组用完时才切换到 False(此后保持 False)。您的计算机很容易预测到这一点,因此这张支票会非常便宜(几乎是免费的)。

一夜之间,我意识到我可以做的是动态剪辑索引:

@njit(float64[:](float64[:], float64[:]))
def add_clamped(a,b):
    # Find the maximum indices to use for clipping purposes
    max_a, max_b = a.shape[0]-1, b.shape[0]-1
    maxsize = max(a.shape[0], b.shape[0])
    c = np.empty(maxsize)    

    # Run throught the arrays and clip indices on the fly
    for idx in range(maxsize):
        idx_a = min(idx, max_a)
        idx_b = min(idx, max_b)

        # Do some crazy expensive math here
        c[idx] = a[idx_a] + b[idx_b]    

    return c

作为测试,我比较了超过 1000 万个条目的算法,结果如下:

add_original:  0.01952 seconds
add_MSeifert:  0.02058 seconds
add_clamped:   0.02562 seconds

所以没有@MSeifert 的回答那么快,但是将代码保持为 1 个循环并将所有核心数学放在一个地方(当做比添加 2 个数组更复杂的事情时)。