连接numpy数组切片的最快方法
Fastest way to concatenate slices of numpy array
我有大量不同大小的小型 numpy 数组(组),我想尽快连接这些组的任意子集。我最初想出的解决方案是将这些组存储为 np.array 个 np.array,然后使用列表索引访问组的子集:
groups = []
for i in range(100000):
size = np.random.randint(3) + 1
groups.append(np.random.randint(1000000, size=size))
groups = np.array(groups) # dtype=np.object
indices = np.random.randint(len(groups), size=1000)
%%timeit
np.concatenate(groups[indices])
>>> 204 µs ± 395 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
但是,这个解决方案在内存消耗方面效率低下,因为组很小(平均 2 个元素),我必须为每个组存储一个 numpy 数组结构,这将近 100 个字节(对我来说太多了) .
为了提高解决方案的效率,我决定连接所有组并将数组边界存储在单独的数组中
data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups])
# ith group is data[offsets[i]: offsets[i + 1]]
但是连接起来一点也不明显。像这样:
%%timeit
np.concatenate([data[offsets[i]: offsets[i + 1]] for i in indices])
>>> 1.02 ms ± 44.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
比原始解决方案慢 5 倍。我认为这是因为两件事。首先,迭代 numpy 数组索引(python 将每个索引的 c-int 包装到对象中)。其次,python 为每个 slice/index 创建 numpy 结构。我认为在纯 python 中减少这种情况的连接时间是不可能的,所以我决定想出一个 cython 解决方案。
%%cython
import numpy as np
ctypedef long long int64
def concatenate(data, offsets, indices):
cdef int64[::] data_view = data
cdef int64[::] indices_view = indices
cdef int64[::] offsets_view = offsets
size = np.sum(offsets[indices + 1]) - np.sum(offsets[indices])
res = np.zeros(size, dtype=np.int64)
cdef int64[::] res_view = res
cdef int64 i, l = 0, r
for i in indices_view:
r = l + offsets_view[i + 1] - offsets_view[i]
res_view[l: r] = data_view[offsets_view[i]: offsets_view[i + 1]]
l = r
return res
%%timeit
concatenate(data, offsets, indices)
>>> 277 µs ± 89.8 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
这个解决方案比以前的解决方案更快,但仍然比原来的要慢一点。但最大的问题是我事先不知道数据类型。我在示例中使用了 int64,但它可以是任何数字类型,例如浮动32。因此,我不能像以前那样使用类型化的内存视图。理论上,我只需要知道类型的大小(4/8 字节),如果我有指向数据和结果数组的指针,我可以使用 memcpy 或类似的东西来复制切片。但我不知道如何在 cython 中做到这一点。有办法吗?
我找到了一种将任意 dtype 数组的切片与 cython 连接起来的方法。有一个c-counterpart一个pythonnumpy.ndarrayclass。它包含指向底层 c-array 和属性 itemsize
开头的指针 data
,它以字节为单位存储单个元素的大小。这样,就可以使用 memcpy
.
连接切片
import numpy as np
cimport numpy as np
cimport cython
from libc.string cimport memcpy
@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate(np.ndarray data, np.ndarray offsets, np.ndarray indices):
data = np.ascontiguousarray(data)
start_offsets = np.ascontiguousarray(offsets[indices], dtype=np.int64)
end_offsets = np.ascontiguousarray(offsets[indices + 1], dtype=np.int64)
cdef np.int64_t[::1] coffsets = start_offsets
cdef np.int64_t[::1] csizes = end_offsets - start_offsets
cdef np.int64_t i, total_size = 0
for i in range(csizes.shape[0]):
total_size += csizes[i]
res = np.empty(total_size, dtype=data.dtype)
cdef np.ndarray cdata = data
cdef np.ndarray cres = res
cdef np.int64_t itemsize = data.itemsize
cdef np.int64_t res_offset = 0
for i in range(csizes.shape[0]):
memcpy(cres.data + res_offset * itemsize,
cdata.data + coffsets[i] * itemsize,
csizes[i] * itemsize)
res_offset += csizes[i]
return res
%%timeit
concatenate(data, offsets, indices)
>>> 21.1 µs ± 24.7 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
这是我的纯 numpy-only 解决方案,adv_concatenate()
函数。与常规 np.concatenate()
.
相比,它提供 15x-47x
倍的加速(在不同机器上不同)
注意:在第一个代码之后还有第二个更快的解决方案。
用于使用的pip模块timerit
的时间测量,由python -m pip install timerit
安装一次。对于计时,使用了两种类型的机器——第一台机器是 Windows-based,所有测试都是一样的,它是我的家用笔记本电脑,第二台机器是 Linux-based,每次测试都是不同的机器(如此不同不同测试之间的速度,但一个 run/test 内的速度相同),我也用 repl.it 站点的机器来测试我的代码。
算法思路是利用numpy的累加函数(.cumsum()
):
- 我们创建了只有
1
s 的数组,数组的大小等于生成的串联数据数组的总大小。该数组将保存要获取的所有 data
个元素的索引,以创建结果数据数组。
- 在这个数组中,在每个块的起始位置(小 sub-array),我们将值更改为这样的方式,即在 运行
cumsum()
之后,在整个结果数组上,这个起始值是转换为 data
数组的起始偏移量。剩余值保持 1
s.
- 我们对这个数组应用
.cumsum()
。现在所有值都将保存要获取的数据元素的正确索引。
- 我们通过用上面形成的索引数组索引
data
来形成结果获取的数据数组。
如果我们 pre-compute 一些值,如 offsets[1:] - offsets[:-1]
或 offsets[:-1] + 1
并在 adv_concatenate()
函数中使用它们,则该算法可能会得到更大的提升。
# Needs: python -m pip install numpy timerit
from timerit import Timerit
import numpy as np
np.random.seed(0)
Timerit._default_asciimode = True
groups = []
for i in range(100000):
size = np.random.randint(3) + 1
groups.append(np.random.randint(1000000, size = size))
groups = np.array(groups) # dtype=np.object
indices = np.random.randint(len(groups), size = 1000)
data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups])
timer = lambda: Timerit(num = 600, verbose = 1)
print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()
def adv_concatenate(data, offsets, indices):
begs, ends = offsets[indices], offsets[indices + 1]
lens = ends - begs
clens = lens.cumsum()
ix = np.ones((clens[-1],), dtype = offsets.dtype)
ix[0] = begs[0]
ix[clens[:-1]] = begs[1:] - ends[:-1] + 1
ix = ix.cumsum()
return data[ix]
print('adv_concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
adv = adv_concatenate(data, offsets, indices)
tadv = tim.mean()
assert np.array_equal(ref, adv) # Check that our solution is correct
print('speedup:', round(tref / tadv, 3))
输出:
在第一台机器上 (Windows):
np.concatenate(): Timed best=3.129 ms, mean=3.225 +- 0.1 ms
adv_concatenate(): Timed best=191.137 us, mean=208.012 +- 20.7 us
speedup: 15.504
在第二台机器上(Linux):
np.concatenate(): Timed best=1.666 ms, mean=2.314 +- 0.4 ms
adv_concatenate(): Timed best=35.596 us, mean=48.680 +- 15.4 us
speedup: 47.532
第二个解决方案比第一个解决方案更快,与常规 np.concatenate()
相比,提供 40x-150x
倍的加速(不同机器上不同)。但第二种解决方案使用基于 Numba JIT LLVM 的编译器,需要通过 python -m pip install numba
.
安装
虽然它使用了额外的numba
包,但核心功能adv_concatenate_indexes_numba()
非常简单,代码行数与第一个解决方案相同。算法也简单得多,两个简单的循环。
当前解决方案适用于任何数据类型,因为中心函数只计算结果索引,因此根本不适用于 data
的 dtype。如果不是计算索引,numba 函数将直接计算结果数据数组,那么当前的解决方案也可以通过 10%-90%
更多提升,但这仅适用于 numba 支持的非常简单的数据类型,包括所有数字类型。 Here is the code (or here) 此改进的解决方案可实现高达 250x
的加速!这个改进版本在第二台机器上的时间(Linux):
np.concatenate(): Timed best=1.640 ms, mean=3.403 +- 1.9 ms
adv_concatenate_numba(): Timed best=12.669 us, mean=17.235 +- 6.9 us
speedup: 197.46
接下来是更通用的 (only-indexes-computing) 解决方案代码:
# Needs: python -m pip install numpy numba timerit
from timerit import Timerit
import numpy as np, numba
np.random.seed(0)
Timerit._default_asciimode = True
groups = []
for i in range(100000):
size = np.random.randint(3) + 1
groups.append(np.random.randint(1000000, size = size, dtype = np.int64))
groups = np.array(groups) # dtype=np.object
indices = np.random.randint(len(groups), size = 1000, dtype = np.int64)
data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups], dtype = np.int64)
timer = lambda: Timerit(num = 600, verbose = 1)
print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()
@numba.njit('i8[:](i8[:], i8[:])', cache = True)
def adv_concatenate_indexes_numba(offsets, indices):
tlen = 0
for i in range(indices.size):
ix = indices[i]
tlen += offsets[ix + 1] - offsets[ix]
pos, r = 0, np.empty((tlen,), dtype = offsets.dtype)
for i in range(indices.size):
ix = indices[i]
for j in range(offsets[ix], offsets[ix + 1]):
r[pos] = j
pos += 1
return r
def adv_concatenate2(data, offsets, indices):
return data[adv_concatenate_indexes_numba(offsets, indices)]
adv_concatenate2(data, offsets, indices) # Once pre-compile Numba
print('adv_concatenate2(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
adv = adv_concatenate2(data, offsets, indices)
tadv = tim.mean()
assert np.array_equal(ref, adv) # Check that our solution is correct
print('speedup:', round(tref / tadv, 3))
输出:
在第一台机器上 (Windows):
np.concatenate(): Timed best=3.201 ms, mean=3.356 +- 0.1 ms
adv_concatenate2(): Timed best=79.681 us, mean=82.991 +- 6.7 us
speedup: 40.442
在第二台机器上(Linux):
np.concatenate(): Timed best=1.541 ms, mean=2.220 +- 0.7 ms
adv_concatenate2(): Timed best=12.012 us, mean=14.830 +- 4.8 us
speedup: 149.716
受到@pavelgramovich 的 Cython 代码的启发 我还决定使用循环 (func concatenate1()
) 而不是 memcpy()
版本 (func [=46) 来实现我的简化版本=]),对于当前测试数据,简化版本似乎比 memcpy 版本快 1.5-2x
。比较以下两个版本的完整代码:
# Needs: python -m pip install numpy timerit cython setuptools
from timerit import Timerit
import numpy as np
np.random.seed(0)
Timerit._default_asciimode = True
groups = []
for i in range(100000):
size = np.random.randint(3) + 1
groups.append(np.random.randint(1000000, size = size, dtype = np.int64))
groups = np.array(groups) # dtype=np.object
indices = np.random.randint(len(groups), size = 1000, dtype = np.int64)
data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups], dtype = np.int64)
timer = lambda: Timerit(num = 600, verbose = 1)
def compile_cy_cats():
src = """
import numpy as np
cimport numpy as np
cimport cython
from libc.string cimport memcpy
@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate0(np.ndarray data, np.ndarray offsets, np.ndarray indices):
data = np.ascontiguousarray(data)
start_offsets = np.ascontiguousarray(offsets[indices], dtype=np.int64)
end_offsets = np.ascontiguousarray(offsets[indices + 1], dtype=np.int64)
cdef np.int64_t[::1] coffsets = start_offsets
cdef np.int64_t[::1] csizes = end_offsets - start_offsets
cdef np.int64_t i, total_size = 0
for i in range(csizes.shape[0]):
total_size += csizes[i]
res = np.empty(total_size, dtype=data.dtype)
cdef np.ndarray cdata = data
cdef np.ndarray cres = res
cdef np.int64_t itemsize = data.itemsize
cdef np.int64_t res_offset = 0
for i in range(csizes.shape[0]):
memcpy(cres.data + res_offset * itemsize,
cdata.data + coffsets[i] * itemsize,
csizes[i] * itemsize)
res_offset += csizes[i]
return res
@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate1(np.int64_t[:] data, np.int64_t[:] offsets, np.int64_t[:] indices):
cdef np.int64_t tlen = 0, pos = 0, ix = 0, ixs = indices.size, i = 0, j = 0
for i in range(ixs):
ix = indices[i]
tlen += offsets[ix + 1] - offsets[ix]
r = np.empty(tlen, dtype = np.int64)
cdef np.int64_t[:] cr = r, cdata = data
for i in range(ixs):
ix = indices[i]
for j in range(offsets[ix], offsets[ix + 1]):
cr[pos] = cdata[j]
pos += 1
return r
"""
srcb = src.encode('utf-8')
import hashlib, os, glob, importlib
srch = hashlib.sha256(srcb).hexdigest().upper()[:8]
if len(glob.glob(f'cy{srch}*')) == 0:
with open(f'cys{srch}.pyx', 'wb') as f:
f.write(srcb)
import sys
from setuptools import setup, Extension
from Cython.Build import cythonize
import numpy as np
sys.argv += ['build_ext', '--inplace']
setup(
ext_modules = cythonize(
Extension(f'cy{srch}', [f'cys{srch}.pyx']), language_level = 3, annotate = True,
),
include_dirs = [np.get_include()],
)
del sys.argv[-2:]
print('Cython module:', f'cy{srch}')
return importlib.import_module(f'cy{srch}')
cy_cats = compile_cy_cats()
concatenate0, concatenate1 = cy_cats.concatenate0, cy_cats.concatenate1
print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()
concatenate0(data, offsets, indices) # Maybe pre-heat
print('cy_concatenate0(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
adv0 = concatenate0(data, offsets, indices)
tadv0 = tim.mean()
assert np.array_equal(ref, adv0) # Check that our solution is correct
print('speedup:', round(tref / tadv0, 3))
concatenate1(data, offsets, indices) # Maybe pre-heat
print('cy_concatenate1(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
adv1 = concatenate1(data, offsets, indices)
tadv1 = tim.mean()
assert np.array_equal(ref, adv1) # Check that our solution is correct
print('speedup:', round(tref / tadv1, 3))
输出:
第一台机器(Windows):
Cython module: cy0BEBA0C8
np.concatenate(): Timed best=3.184 ms, mean=3.263 +- 0.1 ms
cy_concatenate0(): Timed best=119.767 us, mean=128.688 +- 10.7 us
speedup: 25.354
cy_concatenate1(): Timed best=86.525 us, mean=93.699 +- 20.5 us
speedup: 34.821
第二台机器(Linux):
Cython module: cy0BEBA0C8
np.concatenate(): Timed best=1.630 ms, mean=2.215 +- 0.5 ms
cy_concatenate0(): Timed best=21.839 us, mean=28.930 +- 8.4 us
speedup: 76.578
cy_concatenate1(): Timed best=11.447 us, mean=15.263 +- 5.1 us
speedup: 145.151
我有大量不同大小的小型 numpy 数组(组),我想尽快连接这些组的任意子集。我最初想出的解决方案是将这些组存储为 np.array 个 np.array,然后使用列表索引访问组的子集:
groups = []
for i in range(100000):
size = np.random.randint(3) + 1
groups.append(np.random.randint(1000000, size=size))
groups = np.array(groups) # dtype=np.object
indices = np.random.randint(len(groups), size=1000)
%%timeit
np.concatenate(groups[indices])
>>> 204 µs ± 395 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
但是,这个解决方案在内存消耗方面效率低下,因为组很小(平均 2 个元素),我必须为每个组存储一个 numpy 数组结构,这将近 100 个字节(对我来说太多了) .
为了提高解决方案的效率,我决定连接所有组并将数组边界存储在单独的数组中
data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups])
# ith group is data[offsets[i]: offsets[i + 1]]
但是连接起来一点也不明显。像这样:
%%timeit
np.concatenate([data[offsets[i]: offsets[i + 1]] for i in indices])
>>> 1.02 ms ± 44.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
比原始解决方案慢 5 倍。我认为这是因为两件事。首先,迭代 numpy 数组索引(python 将每个索引的 c-int 包装到对象中)。其次,python 为每个 slice/index 创建 numpy 结构。我认为在纯 python 中减少这种情况的连接时间是不可能的,所以我决定想出一个 cython 解决方案。
%%cython
import numpy as np
ctypedef long long int64
def concatenate(data, offsets, indices):
cdef int64[::] data_view = data
cdef int64[::] indices_view = indices
cdef int64[::] offsets_view = offsets
size = np.sum(offsets[indices + 1]) - np.sum(offsets[indices])
res = np.zeros(size, dtype=np.int64)
cdef int64[::] res_view = res
cdef int64 i, l = 0, r
for i in indices_view:
r = l + offsets_view[i + 1] - offsets_view[i]
res_view[l: r] = data_view[offsets_view[i]: offsets_view[i + 1]]
l = r
return res
%%timeit
concatenate(data, offsets, indices)
>>> 277 µs ± 89.8 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
这个解决方案比以前的解决方案更快,但仍然比原来的要慢一点。但最大的问题是我事先不知道数据类型。我在示例中使用了 int64,但它可以是任何数字类型,例如浮动32。因此,我不能像以前那样使用类型化的内存视图。理论上,我只需要知道类型的大小(4/8 字节),如果我有指向数据和结果数组的指针,我可以使用 memcpy 或类似的东西来复制切片。但我不知道如何在 cython 中做到这一点。有办法吗?
我找到了一种将任意 dtype 数组的切片与 cython 连接起来的方法。有一个c-counterpart一个pythonnumpy.ndarrayclass。它包含指向底层 c-array 和属性 itemsize
开头的指针 data
,它以字节为单位存储单个元素的大小。这样,就可以使用 memcpy
.
import numpy as np
cimport numpy as np
cimport cython
from libc.string cimport memcpy
@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate(np.ndarray data, np.ndarray offsets, np.ndarray indices):
data = np.ascontiguousarray(data)
start_offsets = np.ascontiguousarray(offsets[indices], dtype=np.int64)
end_offsets = np.ascontiguousarray(offsets[indices + 1], dtype=np.int64)
cdef np.int64_t[::1] coffsets = start_offsets
cdef np.int64_t[::1] csizes = end_offsets - start_offsets
cdef np.int64_t i, total_size = 0
for i in range(csizes.shape[0]):
total_size += csizes[i]
res = np.empty(total_size, dtype=data.dtype)
cdef np.ndarray cdata = data
cdef np.ndarray cres = res
cdef np.int64_t itemsize = data.itemsize
cdef np.int64_t res_offset = 0
for i in range(csizes.shape[0]):
memcpy(cres.data + res_offset * itemsize,
cdata.data + coffsets[i] * itemsize,
csizes[i] * itemsize)
res_offset += csizes[i]
return res
%%timeit
concatenate(data, offsets, indices)
>>> 21.1 µs ± 24.7 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
这是我的纯 numpy-only 解决方案,adv_concatenate()
函数。与常规 np.concatenate()
.
15x-47x
倍的加速(在不同机器上不同)
注意:在第一个代码之后还有第二个更快的解决方案。
用于使用的pip模块timerit
的时间测量,由python -m pip install timerit
安装一次。对于计时,使用了两种类型的机器——第一台机器是 Windows-based,所有测试都是一样的,它是我的家用笔记本电脑,第二台机器是 Linux-based,每次测试都是不同的机器(如此不同不同测试之间的速度,但一个 run/test 内的速度相同),我也用 repl.it 站点的机器来测试我的代码。
算法思路是利用numpy的累加函数(.cumsum()
):
- 我们创建了只有
1
s 的数组,数组的大小等于生成的串联数据数组的总大小。该数组将保存要获取的所有data
个元素的索引,以创建结果数据数组。 - 在这个数组中,在每个块的起始位置(小 sub-array),我们将值更改为这样的方式,即在 运行
cumsum()
之后,在整个结果数组上,这个起始值是转换为data
数组的起始偏移量。剩余值保持1
s. - 我们对这个数组应用
.cumsum()
。现在所有值都将保存要获取的数据元素的正确索引。 - 我们通过用上面形成的索引数组索引
data
来形成结果获取的数据数组。
如果我们 pre-compute 一些值,如 offsets[1:] - offsets[:-1]
或 offsets[:-1] + 1
并在 adv_concatenate()
函数中使用它们,则该算法可能会得到更大的提升。
# Needs: python -m pip install numpy timerit
from timerit import Timerit
import numpy as np
np.random.seed(0)
Timerit._default_asciimode = True
groups = []
for i in range(100000):
size = np.random.randint(3) + 1
groups.append(np.random.randint(1000000, size = size))
groups = np.array(groups) # dtype=np.object
indices = np.random.randint(len(groups), size = 1000)
data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups])
timer = lambda: Timerit(num = 600, verbose = 1)
print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()
def adv_concatenate(data, offsets, indices):
begs, ends = offsets[indices], offsets[indices + 1]
lens = ends - begs
clens = lens.cumsum()
ix = np.ones((clens[-1],), dtype = offsets.dtype)
ix[0] = begs[0]
ix[clens[:-1]] = begs[1:] - ends[:-1] + 1
ix = ix.cumsum()
return data[ix]
print('adv_concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
adv = adv_concatenate(data, offsets, indices)
tadv = tim.mean()
assert np.array_equal(ref, adv) # Check that our solution is correct
print('speedup:', round(tref / tadv, 3))
输出:
在第一台机器上 (Windows):
np.concatenate(): Timed best=3.129 ms, mean=3.225 +- 0.1 ms
adv_concatenate(): Timed best=191.137 us, mean=208.012 +- 20.7 us
speedup: 15.504
在第二台机器上(Linux):
np.concatenate(): Timed best=1.666 ms, mean=2.314 +- 0.4 ms
adv_concatenate(): Timed best=35.596 us, mean=48.680 +- 15.4 us
speedup: 47.532
第二个解决方案比第一个解决方案更快,与常规 np.concatenate()
相比,提供 40x-150x
倍的加速(不同机器上不同)。但第二种解决方案使用基于 Numba JIT LLVM 的编译器,需要通过 python -m pip install numba
.
虽然它使用了额外的numba
包,但核心功能adv_concatenate_indexes_numba()
非常简单,代码行数与第一个解决方案相同。算法也简单得多,两个简单的循环。
当前解决方案适用于任何数据类型,因为中心函数只计算结果索引,因此根本不适用于 data
的 dtype。如果不是计算索引,numba 函数将直接计算结果数据数组,那么当前的解决方案也可以通过 10%-90%
更多提升,但这仅适用于 numba 支持的非常简单的数据类型,包括所有数字类型。 Here is the code (or here) 此改进的解决方案可实现高达 250x
的加速!这个改进版本在第二台机器上的时间(Linux):
np.concatenate(): Timed best=1.640 ms, mean=3.403 +- 1.9 ms
adv_concatenate_numba(): Timed best=12.669 us, mean=17.235 +- 6.9 us
speedup: 197.46
接下来是更通用的 (only-indexes-computing) 解决方案代码:
# Needs: python -m pip install numpy numba timerit
from timerit import Timerit
import numpy as np, numba
np.random.seed(0)
Timerit._default_asciimode = True
groups = []
for i in range(100000):
size = np.random.randint(3) + 1
groups.append(np.random.randint(1000000, size = size, dtype = np.int64))
groups = np.array(groups) # dtype=np.object
indices = np.random.randint(len(groups), size = 1000, dtype = np.int64)
data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups], dtype = np.int64)
timer = lambda: Timerit(num = 600, verbose = 1)
print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()
@numba.njit('i8[:](i8[:], i8[:])', cache = True)
def adv_concatenate_indexes_numba(offsets, indices):
tlen = 0
for i in range(indices.size):
ix = indices[i]
tlen += offsets[ix + 1] - offsets[ix]
pos, r = 0, np.empty((tlen,), dtype = offsets.dtype)
for i in range(indices.size):
ix = indices[i]
for j in range(offsets[ix], offsets[ix + 1]):
r[pos] = j
pos += 1
return r
def adv_concatenate2(data, offsets, indices):
return data[adv_concatenate_indexes_numba(offsets, indices)]
adv_concatenate2(data, offsets, indices) # Once pre-compile Numba
print('adv_concatenate2(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
adv = adv_concatenate2(data, offsets, indices)
tadv = tim.mean()
assert np.array_equal(ref, adv) # Check that our solution is correct
print('speedup:', round(tref / tadv, 3))
输出:
在第一台机器上 (Windows):
np.concatenate(): Timed best=3.201 ms, mean=3.356 +- 0.1 ms
adv_concatenate2(): Timed best=79.681 us, mean=82.991 +- 6.7 us
speedup: 40.442
在第二台机器上(Linux):
np.concatenate(): Timed best=1.541 ms, mean=2.220 +- 0.7 ms
adv_concatenate2(): Timed best=12.012 us, mean=14.830 +- 4.8 us
speedup: 149.716
受到@pavelgramovich 的 Cython 代码的启发 concatenate1()
) 而不是 memcpy()
版本 (func [=46) 来实现我的简化版本=]),对于当前测试数据,简化版本似乎比 memcpy 版本快 1.5-2x
。比较以下两个版本的完整代码:
# Needs: python -m pip install numpy timerit cython setuptools
from timerit import Timerit
import numpy as np
np.random.seed(0)
Timerit._default_asciimode = True
groups = []
for i in range(100000):
size = np.random.randint(3) + 1
groups.append(np.random.randint(1000000, size = size, dtype = np.int64))
groups = np.array(groups) # dtype=np.object
indices = np.random.randint(len(groups), size = 1000, dtype = np.int64)
data = np.concatenate(groups)
offsets = np.cumsum([0] + [len(group) for group in groups], dtype = np.int64)
timer = lambda: Timerit(num = 600, verbose = 1)
def compile_cy_cats():
src = """
import numpy as np
cimport numpy as np
cimport cython
from libc.string cimport memcpy
@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate0(np.ndarray data, np.ndarray offsets, np.ndarray indices):
data = np.ascontiguousarray(data)
start_offsets = np.ascontiguousarray(offsets[indices], dtype=np.int64)
end_offsets = np.ascontiguousarray(offsets[indices + 1], dtype=np.int64)
cdef np.int64_t[::1] coffsets = start_offsets
cdef np.int64_t[::1] csizes = end_offsets - start_offsets
cdef np.int64_t i, total_size = 0
for i in range(csizes.shape[0]):
total_size += csizes[i]
res = np.empty(total_size, dtype=data.dtype)
cdef np.ndarray cdata = data
cdef np.ndarray cres = res
cdef np.int64_t itemsize = data.itemsize
cdef np.int64_t res_offset = 0
for i in range(csizes.shape[0]):
memcpy(cres.data + res_offset * itemsize,
cdata.data + coffsets[i] * itemsize,
csizes[i] * itemsize)
res_offset += csizes[i]
return res
@cython.boundscheck(False)
@cython.wraparound(False)
def concatenate1(np.int64_t[:] data, np.int64_t[:] offsets, np.int64_t[:] indices):
cdef np.int64_t tlen = 0, pos = 0, ix = 0, ixs = indices.size, i = 0, j = 0
for i in range(ixs):
ix = indices[i]
tlen += offsets[ix + 1] - offsets[ix]
r = np.empty(tlen, dtype = np.int64)
cdef np.int64_t[:] cr = r, cdata = data
for i in range(ixs):
ix = indices[i]
for j in range(offsets[ix], offsets[ix + 1]):
cr[pos] = cdata[j]
pos += 1
return r
"""
srcb = src.encode('utf-8')
import hashlib, os, glob, importlib
srch = hashlib.sha256(srcb).hexdigest().upper()[:8]
if len(glob.glob(f'cy{srch}*')) == 0:
with open(f'cys{srch}.pyx', 'wb') as f:
f.write(srcb)
import sys
from setuptools import setup, Extension
from Cython.Build import cythonize
import numpy as np
sys.argv += ['build_ext', '--inplace']
setup(
ext_modules = cythonize(
Extension(f'cy{srch}', [f'cys{srch}.pyx']), language_level = 3, annotate = True,
),
include_dirs = [np.get_include()],
)
del sys.argv[-2:]
print('Cython module:', f'cy{srch}')
return importlib.import_module(f'cy{srch}')
cy_cats = compile_cy_cats()
concatenate0, concatenate1 = cy_cats.concatenate0, cy_cats.concatenate1
print('np.concatenate(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
ref = np.concatenate([data[offsets[i] : offsets[i + 1]] for i in indices])
tref = tim.mean()
concatenate0(data, offsets, indices) # Maybe pre-heat
print('cy_concatenate0(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
adv0 = concatenate0(data, offsets, indices)
tadv0 = tim.mean()
assert np.array_equal(ref, adv0) # Check that our solution is correct
print('speedup:', round(tref / tadv0, 3))
concatenate1(data, offsets, indices) # Maybe pre-heat
print('cy_concatenate1(): ', end = '', flush = True)
tim = timer()
for t in tim:
with t:
adv1 = concatenate1(data, offsets, indices)
tadv1 = tim.mean()
assert np.array_equal(ref, adv1) # Check that our solution is correct
print('speedup:', round(tref / tadv1, 3))
输出:
第一台机器(Windows):
Cython module: cy0BEBA0C8
np.concatenate(): Timed best=3.184 ms, mean=3.263 +- 0.1 ms
cy_concatenate0(): Timed best=119.767 us, mean=128.688 +- 10.7 us
speedup: 25.354
cy_concatenate1(): Timed best=86.525 us, mean=93.699 +- 20.5 us
speedup: 34.821
第二台机器(Linux):
Cython module: cy0BEBA0C8
np.concatenate(): Timed best=1.630 ms, mean=2.215 +- 0.5 ms
cy_concatenate0(): Timed best=21.839 us, mean=28.930 +- 8.4 us
speedup: 76.578
cy_concatenate1(): Timed best=11.447 us, mean=15.263 +- 5.1 us
speedup: 145.151