将 numba 与 np.concatenate 并行使用效率不高?

using numba with np.concatenate is not efficient in parallel?

我在 np.concatenate 高效并行化时遇到了一些问题。这是一个最小的工作示例。 (我知道在这里我可以分别对 ab 求和,但我专注于并行化连接操作,因为这是我需要在我的项目中做的事情。然后我会对连接数组,例如排序。)

无论我 运行 这个有多少个内核,似乎总是需要相同的时间(~10 秒)。如果有的话,它在更多内核上更慢。我尝试在装饰器中使用 nogil=True 在 cc 上发布 GIL,但无济于事。请注意,即使没有加速,所有核心在计算过程中显然都在使用。

有人能帮帮我吗?

非常感谢

from numba import prange, njit
import numpy as np


@njit()
def cc():

    r = np.random.rand(20)
    a = r[r < 0.5]
    b = r[r > 0.7]

    c = np.concatenate((a, b))

    return np.sum(c)


@njit(parallel=True)
def cc_wrap():
    n = 10 ** 7
    result = np.empty(n)
    for i in prange(n):
        result[i] = cc()

    return result

cc_wrap()

主要问题来自分配器的争用

cc 函数创建许多隐式小型临时数组。例如 np.random.randr < 0.5 甚至 a = r[condition] 一样,更不用说 np.concatenate。临时数组通常需要使用给定的分配器在堆中分配。标准库提供的默认分配器不能保证使用多线程可以很好地扩展。分配不能完美缩放,因为分配数据的线程之间需要昂贵的隐式同步。例如,一个线程可以分配一个被另一个线程删除的数组。在最坏的情况下,分配器可以序列化 allocations/deletes。因为与在分配的数据上执行的工作相比,分配数据的成本是巨大的,所以同步的开销是主要的,并且整个执行是串行的。实际上,情况更糟,因为 顺序时间已经被开销所支配

请注意,积极优化的编译器可以在 堆栈 上分配数组,因为它们不会 escape the function。然而,不幸的是,Numba 并没有明显地做到这一点。此外,假设 Numba 线程 never 删除由其他线程分配的数据(这可能是这种情况,但我不完全确定),分配器可以使用每线程池进行调整以很好地扩展.尽管如此,仍需要向通常不能很好扩展的操作系统请求分配的内存池(尤其是 Windows AFAIK)。

最好的解决办法就是避免创建隐式临时数组。这可以使用 结合 分区算法 。请注意,可以通过为 Numba 指定类型来[=44​​=]提前编译函数。

这是最终的实现:

import numba as nb
import numpy as np
import random

@nb.njit('float64(float64[::1])')
def cc(tempBuffer):
    assert tempBuffer.size >= 20

    # View to the temporary scratch buffer
    r = tempBuffer[0:20]

    # Generate 20 random numbers without any allocation
    for i in range(20):
        r[i] = random.random()

    j = 0

    # Partition the array so to put values smaller than
    # a condition in the beginning.
    # After the loop, r[0:split] contains the filtered values.
    for i in range(r.size):
        if r[i] < 0.5:
            r[i], r[j] = r[j], r[i]
            j += 1

    split = j

    # Partition the rest of the array.
    # After the loop, r[split:j] contains the other filtered values.
    for i in range(j, r.size):
        if r[i] > 0.7:
            r[i], r[j] = r[j], r[i]
            j += 1

    # Note that extracting contiguous views it cheap as 
    # it does not create a new temporary array
    # a = r[0:split]
    # b = r[split:j]
    c = r[0:j]

    return np.sum(c)

@nb.njit('float64[:]()', parallel=True)
def cc_wrap():
    n = 10 ** 7
    result = np.empty(n)

    # Preallocate some space once for all threads
    globalTempBuffer = np.empty((nb.get_num_threads(), 64), dtype=np.float64)

    for i in nb.prange(n):
        threadId = nb.np.ufunc.parallel._get_thread_id()
        threadLocalBuffer = globalTempBuffer[threadId]
        result[i] = cc(threadLocalBuffer)

    return result

cc_wrap()

请注意,线程局部操作有点棘手,通常不需要。在这种情况下,仅使用分区算法减少分配就可以明显加快速度。但是,由于临时数组的大小和分配的数量非常小,分配的开销仍然很大。

另请注意,此代码中并不严格要求 r 数组,因为可以 就地 对随机数求和。这可能不符合您对真实用例的需求。这是一个(更简单的实现):

@nb.njit('float64()')
def cc():
    s = 0.0
    for i in range(20):
        e = random.random()
        if e < 0.5 or e > 0.7:
            s += e
    return s

@nb.njit('float64[:]()', parallel=True)
def cc_wrap():
    n = 10 ** 7
    result = np.empty(n)
    for i in nb.prange(n):
        result[i] = cc()
    return result

cc_wrap()

这是我的 6 核机器上的时间:

# Initial (sequential):      8.1 s
# Initial (parallel):        9.0 s
# Array-based (sequential):  2.50 s
# Array-based (parallel):    0.41 s
# In-place (sequential):     1.09 s
# In-place (parallel):       0.19 s

最终,最快的并行版本比原始版本快 47 倍(并且几乎完美扩展)。