连接numpy数组切片的最快方法

Fastest way to concatenate slices of numpy array

我有大量不同大小的小型 numpy 数组(组),我想尽快连接这些组的任意子集。我最初想出的解决方案是将这些组存储为 np.array 个 np.array,然后使用列表索引访问组的子集:

groups = []
for i in range(100000):
    size = np.random.randint(3) + 1
    groups.append(np.random.randint(1000000, size=size))
groups = np.array(groups)  # dtype=np.object
indices = np.random.randint(len(groups), size=1000)

%%timeit
np.concatenate(groups[indices])
>>> 204 µs ± 395 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

但是,这个解决方案在内存消耗方面效率低下,因为组很小(平均 2 个元素),我必须为每个组存储一个 numpy 数组结构,这将近 100 个字节(对我来说太多了) .

为了提高解决方案的效率,我决定连接所有组并将数组边界存储在单独的数组中

data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups])
# ith group is data[offsets[i]: offsets[i + 1]]

但是连接起来一点也不明显。像这样:

%%timeit
np.concatenate([data[offsets[i]: offsets[i + 1]] for i in indices])
>>> 1.02 ms ± 44.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

比原始解决方案慢 5 倍。我认为这是因为两件事。首先,迭代 numpy 数组索引(python 将每个索引的 c-int 包装到对象中)。其次,python 为每个 slice/index 创建 numpy 结构。我认为在纯 python 中减少这种情况的连接时间是不可能的,所以我决定想出一个 cython 解决方案。

%%cython
import numpy as np
ctypedef long long int64

def concatenate(data, offsets, indices):
    cdef int64[::] data_view = data
    cdef int64[::] indices_view = indices
    cdef int64[::] offsets_view = offsets
    
    size = np.sum(offsets[indices + 1]) - np.sum(offsets[indices])
    res = np.zeros(size, dtype=np.int64)
    cdef int64[::] res_view = res
    
    cdef int64 i, l = 0, r
    for i in indices_view:
        r = l + offsets_view[i + 1] - offsets_view[i]
        res_view[l: r] = data_view[offsets_view[i]: offsets_view[i + 1]]
        l = r
    return res

%%timeit
concatenate(data, offsets, indices)
>>> 277 µs ± 89.8 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

这个解决方案比以前的解决方案更快,但仍然比原来的要慢一点。但最大的问题是我事先不知道数据类型。我在示例中使用了 int64,但它可以是任何数字类型,例如浮动32。因此,我不能像以前那样使用类型化的内存视图。理论上,我只需要知道类型的大小(4/8 字节),如果我有指向数据和结果数组的指针,我可以使用 memcpy 或类似的东西来复制切片。但我不知道如何在 cython 中做到这一点。有办法吗?

我找到了一种将任意 dtype 数组的切片与 cython 连接起来的方法。有一个c-counterpart一个pythonnumpy.ndarrayclass。它包含指向底层 c-array 和属性 itemsize 开头的指针 data,它以字节为单位存储单个元素的大小。这样,就可以使用 memcpy.

连接切片
import numpy as np
cimport numpy as np
cimport cython
from libc.string cimport memcpy 

@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate(np.ndarray data, np.ndarray offsets, np.ndarray indices):
    data = np.ascontiguousarray(data)
    start_offsets = np.ascontiguousarray(offsets[indices], dtype=np.int64)
    end_offsets = np.ascontiguousarray(offsets[indices + 1], dtype=np.int64)
    cdef np.int64_t[::1] coffsets = start_offsets
    cdef np.int64_t[::1] csizes = end_offsets - start_offsets
    
    cdef np.int64_t i, total_size = 0
    for i in range(csizes.shape[0]):
        total_size += csizes[i]
    res = np.empty(total_size, dtype=data.dtype)

    cdef np.ndarray cdata = data
    cdef np.ndarray cres = res
    
    cdef np.int64_t itemsize = data.itemsize
    cdef np.int64_t res_offset = 0
    for i in range(csizes.shape[0]):
        memcpy(cres.data + res_offset * itemsize, 
               cdata.data + coffsets[i] * itemsize, 
               csizes[i] * itemsize)
        res_offset += csizes[i]
    
    return res

%%timeit
concatenate(data, offsets, indices)
>>> 21.1 µs ± 24.7 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

这是我的纯 numpy-only 解决方案,adv_concatenate() 函数。与常规 np.concatenate().

相比,它提供 15x-47x 倍的加速(在不同机器上不同)

注意:在第一个代码之后还有第二个更快的解决方案。

用于使用的pip模块timerit的时间测量,由python -m pip install timerit安装一次。对于计时,使用了两种类型的机器——第一台机器是 Windows-based,所有测试都是一样的,它是我的家用笔记本电脑,第二台机器是 Linux-based,每次测试都是不同的机器(如此不同不同测试之间的速度,但一个 run/test 内的速度相同),我也用 repl.it 站点的机器来测试我的代码。

算法思路是利用numpy的累加函数(.cumsum()):

  1. 我们创建了只有 1s 的数组,数组的大小等于生成的串联数据数组的总大小。该数组将保存要获取的所有 data 个元素的索引,以创建结果数据数组。
  2. 在这个数组中,在每个块的起始位置(小 sub-array),我们将值更改为这样的方式,即在 运行 cumsum() 之后,在整个结果数组上,这个起始值是转换为 data 数组的起始偏移量。剩余值保持 1s.
  3. 我们对这个数组应用 .cumsum()。现在所有值都将保存要获取的数据元素的正确索引。
  4. 我们通过用上面形成的索引数组索引 data 来形成结果获取的数据数组。

如果我们 pre-compute 一些值,如 offsets[1:] - offsets[:-1]offsets[:-1] + 1 并在 adv_concatenate() 函数中使用它们,则该算法可能会得到更大的提升。

Try it online!

# Needs: python -m pip install numpy timerit
from timerit import Timerit
import numpy as np
np.random.seed(0)
Timerit._default_asciimode = True

groups = []
for i in range(100000):
    size = np.random.randint(3) + 1
    groups.append(np.random.randint(1000000, size = size))
groups = np.array(groups)  # dtype=np.object
indices = np.random.randint(len(groups), size = 1000)

data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups])

timer = lambda: Timerit(num = 600, verbose = 1)

print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
    with t:
        ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()

def adv_concatenate(data, offsets, indices):
    begs, ends = offsets[indices], offsets[indices + 1]
    lens = ends - begs
    clens = lens.cumsum()
    ix = np.ones((clens[-1],), dtype = offsets.dtype)
    ix[0] = begs[0]
    ix[clens[:-1]] = begs[1:] - ends[:-1] + 1
    ix = ix.cumsum()
    return data[ix]
    
print('adv_concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
    with t:
        adv = adv_concatenate(data, offsets, indices)
tadv = tim.mean()
assert np.array_equal(ref, adv) # Check that our solution is correct

print('speedup:', round(tref / tadv, 3))

输出:

在第一台机器上 (Windows):

np.concatenate(): Timed best=3.129 ms, mean=3.225 +- 0.1 ms
adv_concatenate(): Timed best=191.137 us, mean=208.012 +- 20.7 us
speedup: 15.504

在第二台机器上(Linux):

np.concatenate(): Timed best=1.666 ms, mean=2.314 +- 0.4 ms
adv_concatenate(): Timed best=35.596 us, mean=48.680 +- 15.4 us
speedup: 47.532

第二个解决方案比第一个解决方案更快,与常规 np.concatenate() 相比,提供 40x-150x 倍的加速(不同机器上不同)。但第二种解决方案使用基于 Numba JIT LLVM 的编译器,需要通过 python -m pip install numba.

安装

虽然它使用了额外的numba包,但核心功能adv_concatenate_indexes_numba()非常简单,代码行数与第一个解决方案相同。算法也简单得多,两个简单的循环。

当前解决方案适用于任何数据类型,因为中心函数只计算结果索引,因此根本不适用于 data 的 dtype。如果不是计算索引,numba 函数将直接计算结果数据数组,那么当前的解决方案也可以通过 10%-90% 更多提升,但这仅适用于 numba 支持的非常简单的数据类型,包括所有数字类型。 Here is the code (or here) 此改进的解决方案可实现高达 250x 的加速!这个改进版本在第二台机器上的时间(Linux):

np.concatenate(): Timed best=1.640 ms, mean=3.403 +- 1.9 ms
adv_concatenate_numba(): Timed best=12.669 us, mean=17.235 +- 6.9 us
speedup: 197.46

接下来是更通用的 (only-indexes-computing) 解决方案代码:

Try it online!

# Needs: python -m pip install numpy numba timerit
from timerit import Timerit
import numpy as np, numba
np.random.seed(0)
Timerit._default_asciimode = True

groups = []
for i in range(100000):
    size = np.random.randint(3) + 1
    groups.append(np.random.randint(1000000, size = size, dtype = np.int64))
groups = np.array(groups)  # dtype=np.object
indices = np.random.randint(len(groups), size = 1000, dtype = np.int64)

data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups], dtype = np.int64)

timer = lambda: Timerit(num = 600, verbose = 1)

print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
    with t:
        ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()

@numba.njit('i8[:](i8[:], i8[:])', cache = True)
def adv_concatenate_indexes_numba(offsets, indices):
    tlen = 0
    for i in range(indices.size):
        ix = indices[i]
        tlen += offsets[ix + 1] - offsets[ix]
        
    pos, r = 0, np.empty((tlen,), dtype = offsets.dtype)
    for i in range(indices.size):
        ix = indices[i]
        for j in range(offsets[ix], offsets[ix + 1]):
            r[pos] = j
            pos += 1
            
    return r
    
def adv_concatenate2(data, offsets, indices):
    return data[adv_concatenate_indexes_numba(offsets, indices)]
    
adv_concatenate2(data, offsets, indices) # Once pre-compile Numba
print('adv_concatenate2(): ', end = '', flush = True)
tim = timer()
for t in tim:
    with t:
        adv = adv_concatenate2(data, offsets, indices)
tadv = tim.mean()
assert np.array_equal(ref, adv) # Check that our solution is correct

print('speedup:', round(tref / tadv, 3))

输出:

在第一台机器上 (Windows):

np.concatenate(): Timed best=3.201 ms, mean=3.356 +- 0.1 ms
adv_concatenate2(): Timed best=79.681 us, mean=82.991 +- 6.7 us
speedup: 40.442

在第二台机器上(Linux):

np.concatenate(): Timed best=1.541 ms, mean=2.220 +- 0.7 ms
adv_concatenate2(): Timed best=12.012 us, mean=14.830 +- 4.8 us
speedup: 149.716

受到@pavelgramovich 的 Cython 代码的启发 我还决定使用循环 (func concatenate1()) 而不是 memcpy() 版本 (func [=46) 来实现我的简化版本=]),对于当前测试数据,简化版本似乎比 memcpy 版本快 1.5-2x。比较以下两个版本的完整代码:

Try it online!

# Needs: python -m pip install numpy timerit cython setuptools
from timerit import Timerit
import numpy as np
np.random.seed(0)
Timerit._default_asciimode = True

groups = []
for i in range(100000):
    size = np.random.randint(3) + 1
    groups.append(np.random.randint(1000000, size = size, dtype = np.int64))
groups = np.array(groups)  # dtype=np.object
indices = np.random.randint(len(groups), size = 1000, dtype = np.int64)

data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups], dtype = np.int64)

timer = lambda: Timerit(num = 600, verbose = 1)

def compile_cy_cats():
    src = """
import numpy as np
cimport numpy as np
cimport cython
from libc.string cimport memcpy 

@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate0(np.ndarray data, np.ndarray offsets, np.ndarray indices):
    data = np.ascontiguousarray(data)
    start_offsets = np.ascontiguousarray(offsets[indices], dtype=np.int64)
    end_offsets = np.ascontiguousarray(offsets[indices + 1], dtype=np.int64)
    cdef np.int64_t[::1] coffsets = start_offsets
    cdef np.int64_t[::1] csizes = end_offsets - start_offsets
    
    cdef np.int64_t i, total_size = 0
    for i in range(csizes.shape[0]):
        total_size += csizes[i]
    res = np.empty(total_size, dtype=data.dtype)

    cdef np.ndarray cdata = data
    cdef np.ndarray cres = res
    
    cdef np.int64_t itemsize = data.itemsize
    cdef np.int64_t res_offset = 0
    for i in range(csizes.shape[0]):
        memcpy(cres.data + res_offset * itemsize, 
               cdata.data + coffsets[i] * itemsize, 
               csizes[i] * itemsize)
        res_offset += csizes[i]
    
    return res

@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate1(np.int64_t[:] data, np.int64_t[:] offsets, np.int64_t[:] indices):
    cdef np.int64_t tlen = 0, pos = 0, ix = 0, ixs = indices.size, i = 0, j = 0
    
    for i in range(ixs):
        ix = indices[i]
        tlen += offsets[ix + 1] - offsets[ix]
        
    r = np.empty(tlen, dtype = np.int64)
    cdef np.int64_t[:] cr = r, cdata = data

    for i in range(ixs):
        ix = indices[i]
        for j in range(offsets[ix], offsets[ix + 1]):
            cr[pos] = cdata[j]
            pos += 1
    
    return r
    """
    
    srcb = src.encode('utf-8')
    
    import hashlib, os, glob, importlib
    srch = hashlib.sha256(srcb).hexdigest().upper()[:8]

    if len(glob.glob(f'cy{srch}*')) == 0:
        with open(f'cys{srch}.pyx', 'wb') as f:
            f.write(srcb)

        import sys
        from setuptools import setup, Extension
        from Cython.Build import cythonize
        import numpy as np

        sys.argv += ['build_ext', '--inplace']
        setup(
            ext_modules = cythonize(
                Extension(f'cy{srch}', [f'cys{srch}.pyx']), language_level = 3, annotate = True,
            ),
            include_dirs = [np.get_include()],
        )
        del sys.argv[-2:]

    print('Cython module:', f'cy{srch}')
    return importlib.import_module(f'cy{srch}')

cy_cats = compile_cy_cats()
concatenate0, concatenate1 = cy_cats.concatenate0, cy_cats.concatenate1

print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
    with t:
        ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()

concatenate0(data, offsets, indices) # Maybe pre-heat
print('cy_concatenate0(): ', end = '', flush = True)
tim = timer()
for t in tim:
    with t:
        adv0 = concatenate0(data, offsets, indices)
tadv0 = tim.mean()
assert np.array_equal(ref, adv0) # Check that our solution is correct

print('speedup:', round(tref / tadv0, 3))

concatenate1(data, offsets, indices) # Maybe pre-heat
print('cy_concatenate1(): ', end = '', flush = True)
tim = timer()
for t in tim:
    with t:
        adv1 = concatenate1(data, offsets, indices)
tadv1 = tim.mean()
assert np.array_equal(ref, adv1) # Check that our solution is correct

print('speedup:', round(tref / tadv1, 3))

输出:

第一台机器(Windows):

Cython module: cy0BEBA0C8
np.concatenate(): Timed best=3.184 ms, mean=3.263 +- 0.1 ms
cy_concatenate0(): Timed best=119.767 us, mean=128.688 +- 10.7 us
speedup: 25.354
cy_concatenate1(): Timed best=86.525 us, mean=93.699 +- 20.5 us
speedup: 34.821

第二台机器(Linux):

Cython module: cy0BEBA0C8
np.concatenate(): Timed best=1.630 ms, mean=2.215 +- 0.5 ms
cy_concatenate0(): Timed best=21.839 us, mean=28.930 +- 8.4 us
speedup: 76.578
cy_concatenate1(): Timed best=11.447 us, mean=15.263 +- 5.1 us
speedup: 145.151