通过 bytearray 从 ndarray 选择行

Selecting rows from ndarray via bytearray

我有一个从 redis 中提取的字节数组。

r.set('a', '')
r.setbit('a', 0, 1)
r.setbit('a', 1, 1)
r.setbit('a', 12, 1)

a_raw = db.get('a')
# b'\xc0\x08'
a_bin = bin(int.from_bytes(a, byteorder="big")) 
# 0b1100000000001000

我想将该字节数组用于 ndarray 中的 select 行。

arr = np.arange(12)
arr[a_raw]
# array([0, 1, 12])

编辑 两种解决方案都有效,但我发现@paul-panzer 的速度更快

import timeit

setup = '''import numpy as np; a = b'\xc0\x08'; '''

t1 = timeit.timeit('idx = np.unpackbits(np.frombuffer(a, np.uint8)); np.where(idx)', 
              setup = setup, number=10000)

t2 = timeit.timeit('idx = np.array(list(bin(int.from_bytes(a, byteorder="big"))[2:])) == "1"; np.where(idx)',
              setup = setup, number=10000)

print(t1, t2)
#0.019560601096600294 0.054518797900527716

编辑 2 实际上,from_bytes 方法并没有 return 我要找的东西:

redis_db.delete('timeit_test')
redis_db.setbit('timeit_test', 12666, 1)
redis_db.setbit('timeit_test', 14379, 1)
by = redis_db.get('timeit_test')

idx = np.unpackbits(np.frombuffer(by, np.uint8))
indices = np.where(idx)

idx = np.array(list(bin(int.from_bytes(by, byteorder="big"))[2:])) == "1"
indices_2 = np.where(idx)

print(indices, indices_2)
#(array([12666, 14379]),) (array([   1, 1714]),)

这是一种方法:

In [57]: b = 0b1100000000001000

In [58]: mask = np.array(list(bin(b)[2:])) == '1'

In [59]: arr = np.arange(13)

In [60]: arr[mask[:len(arr)]]
Out[60]: array([ 0,  1, 12])

此外,这是一个简单的检查来证明 ndarray__getitem__ 实现不支持直接在 bytes 对象上建立索引:

In [61]: by = b'\xc0\x08'

In [62]: arr[by]
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-111-6cd68003b176> in <module>()
----> 1 arr[by]

IndexError: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) 
and integer or boolean arrays are valid indices

因此,除非您继承 ndarray 或创建具有自定义 __getitem__ 行为的扩展模块,否则无法直接从 bytes 执行此操作,您必须转换字节根据按位条件转换为布尔掩码。

这是一个比较直接从原始 bytes 对象工作的几种不同方法的时间的例子:

In [171]: %timeit np.array(list(bin(int.from_bytes(by, byteorder='big'))[2:])) == '1'
3.51 µs ± 38 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [172]: %timeit np.unpackbits(np.frombuffer(by, np.uint8))
2.05 µs ± 29.59 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [173]: %timeit np.array(list(bin(struct.unpack('>H', by)[0])[2:])) == '1'
2.65 µs ± 6.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

下面是使用 unpackbits 的方法:

>>> a = b'\xc0\x08'
>>> b = np.arange(32).reshape(16, 2)
>>> c = np.arange(40).reshape(20, 2)
>>> 
>>> idx = np.unpackbits(np.frombuffer(a, np.uint8))
>>> 
# if the sizes match boolen indexing can be used
>>> b[idx.view(bool)]
array([[ 0,  1],
       [ 2,  3],
       [24, 25]])
>>> 
# non matching sizes can be worked around using where
>>> c[np.where(idx)]
array([[ 0,  1],
       [ 2,  3],
       [24, 25]])
>>>