从 Python 中的数据流中提取携带信息的 12 位

Extract 12 bits that carry information from a data stream in Python

我正在研究EEG(时间序列采集设备)串行驱动的实现。该设备使用 12 位 x 26 个总通道对数据进行编码,采样率为 200Hz

串行数据流由信号字节 0xA0 和 45 个字节组成,这些字节携带 26 个通道的数据,每个通道用 12 位编码。

但这里有一个问题,这 12 位在 45 字节块中的位置并不固定。第一个 bye 仅使用 4 LSB,而其余 44 个 7 LSB。

为了更加说明这一点,我将尝试在下面以图形方式表示它。假设我们已经启动了放大器,它总是为所有通道提供 4095(用 12 位表示的最大 int 值)(所以我们的数据都是“1”),那么我们有这样的东西:

a0 0f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f a0 下一个样本...

这必须映射到值为 4095 的 int(1,...,26)。

所以,我做了一个 python 代码,首先找到块的开头,然后将所有内容保存在一个 int/long 中,然后我删除了固定位置的位,最多追加 8有效的 0 位以构成 16 位表示并将字节数组转换为整数列表。

可以正常工作,但问题是速度。似乎代码为单个样本花费了相当多的时间,并且它必须在一秒钟内完成 200 次。让我们包括一些真正的串行读取方法的其他延迟,对于所有 200 个样本,一切都必须保持在 1 秒以下

#Python code
def readByte():
#mockup
    return 0xA0

def read45bytes():
    return      int(0x0f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f)


def remove_bit(num, i):
    mask = num >> (i + 1)
    mask = mask << i
    right = ((1 << i) - 1) & num
    return mask | right

def insert_mult_bits(num, bits, len, i):
    mask = num >> i
    mask = (mask << len) | bits
    mask = mask << i
    right = ((1 << i) - 1) & num
    return right | mask



def main():

    while(readByte()!=0xA0):
        print("Searching for the beginning of the packet of 45 bytes...")

    print("Beginning of the packet of 45 bytes found")



    #read whole sample
    sample=read45bytes()

    #remove unused bits
    corr=0;
    for i in range(7, sample.bit_length(), 8):
        sample=remove_bit(sample,i-corr);
        corr=corr+1;

    #add HSB to make 2byte representation
    corr=0;
    for i in range(12,sample.bit_length(),12):
        sample=insert_mult_bits(sample,0,4,i+corr)
        corr=corr+4;

    #convert to bytes 26channels x 2 bytes, bigendian
    bt=sample.to_bytes(26*2,'big');

    #assign the result to int list
    idx=0;
    out=[];
    for i in range(0,26*2-1,2):
        out.append(int(int((bt[i]<<8 | bt[i+1]))))
        idx=idx+1;

    #print first sample of the channel 1
    print(out.pop(0))

code00.py:

#!/usr/bin/env python

import sys
import math
import io
import itertools as it


START_MARKER = b"\xA0"
START_MARKER_LEN = len(START_MARKER)

BIT_VALUE_MASK = list(2 ** i for i in range(7, -1, -1))

IGNORED_BITS_INDEXES = (7,)


def chunk_size(channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
    libi = len(ignored_bits_indexes)
    #if libi > 7:
    #    raise ValueError
    bits = channel_count * bits_per_channel
    bpb = 8 - libi
    q, r = divmod(bits, bpb)
    r += ignored_heading_bits
    return q + math.ceil(r / 8)


def byte_2_bits(byte):
    return [1 if (byte & i) else 0 for i in BIT_VALUE_MASK]


def bits_2_val(bits):
    return sum(2 ** idx if bit == 1 else 0 for idx, bit in enumerate(bits[::-1]))


def decode_chunk(chunk, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
    bit_lists = [reversed(byte_2_bits(b)) for b in chunk[::-1]]
    bits = list(it.chain(*bit_lists))
    channels = []
    cur_chan_bits = []
    for idx, bit in enumerate(bits[:-ignored_heading_bits]):
        if idx % 8 in ignored_bits_indexes:
            continue
        cur_chan_bits.append(bit)
        if len(cur_chan_bits) == bits_per_channel:
            channels.append(bits_2_val(cur_chan_bits[::-1]))
            cur_chan_bits = []
    if cur_chan_bits:
        raise ValueError("Something went wrong while decoding: ", cur_chan_bits)
    return channels[::-1]


def read_data(stream, channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
    while 1:
        t = stream.read(START_MARKER_LEN)
        if not t:
            break
        if t != START_MARKER:
            continue
        print("Start marker...")
        size = chunk_size(channel_count=channel_count, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
        chunk = stream.read(size)
        if len(chunk) == size:
            decoded = decode_chunk(chunk, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
            print("Decoded: {:}\n".format(decoded))
    print("End of data.")


def main(*argv):
                         # 1st chunk is the one in the question, I played a bit with next ones
    b =   START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F" \
        + START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7E" \
        + START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x60\x01" \
        + START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x5F\x7F" \
        + START_MARKER + b"\x00\x00\x3F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F"

    read_data(io.BytesIO(b))


if __name__ == "__main__":
    print("Python {:s} {:03d}bit on {:s}\n".format(" ".join(elem.strip() for elem in sys.version.split("\n")),
                                                   64 if sys.maxsize > 0x100000000 else 32, sys.platform))
    rc = main(*sys.argv[1:])
    print("\nDone.")
    sys.exit(rc)

备注:

  • 这种方法(几乎)不使用位操作,而是将数字(字节)中的位处理为数字列表(可能值:0、1)

  • 解码(在完整的数据块上):

    1. 反转:

      1. 块中的所有字节

      2. 每个字节中的所有位

      以相反的顺序获取块位

    2. 遍历bit列表(跳过每个字节的第7th位),当遇到12位时,将它们倒序转换(以“撤销” " bit#1.2.) 反转为添加到频道列表的频道值

    3. Return 倒序的频道列表(“撤消” 字节 反转#1.1.)

  • 使用了一些效用函数(我猜很简单)

  • 可以添加更好的错误处理

输出:

py_pc064_03_08_test0) [cfati@cfati-5510-0:/mnt/e/Work/Dev/Whosebug/q069660629]> python code00.py 
Python 3.8.10 (default, Sep 28 2021, 16:10:42) [GCC 9.3.0] 064bit on linux

Start marker...
Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095]

Start marker...
Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094]

Start marker...
Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 1]

Start marker...
Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094, 4095]

Start marker...
Decoded: [0, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095]

End of data.

Done.

CristiFati 的解决方案很聪明,而且效果很好。然而,我发现乍一看有点难以理解(当然是我的错),尽管对于用例来说速度足够快,但它并没有达到应有的速度。所以我试图找到更简单、更快的东西。

我的推理是:一旦找到起始标记,我们就可以预先计算每个通道的数据位的位置。例如,通道 0 将使用字节 0 的位 4-7、字节 1 的位 1-7 和字节 2 的位 1;通道 1 将使用字节 2 的位 2-7 和字节 3 的位 1-6;等等。请注意,由于数据长度为 12 位,每个通道将跨越 2 或 3 个字节。

以通道 1 为例:我们使用第一个字节的 4 位,因此我们将其屏蔽为 2**4-1,然后我们需要将其乘以 2**8,因为我们还有其他 8 位要考虑;然后对于第二个字节,我们使用 7 位,因此我们将其屏蔽为 2**7-1,并将其乘以 2**1。最后一个字节不同:我们首先将它除以 2**6 以清除属于下一个通道的位,然后将其屏蔽为 2**1-1.

因此对于每个通道,我们可以存储一个记录字节位置的元组,select 相关位所需的位掩码,以及我们需要移动的位数:最后一个向右移动通道的字节,在前一个字节的左侧。

重要的一点是,这个设置部分只需执行一次。之后我们可以读取我们的块并应用我们需要的计算:

import io

MARK = b"\xA0"

def set_specs():
    CHANNELS_NO = 26
    CHANNEL_SIZE = 12
    BYTE_SIZE = 8
    MARX = b"\xA0"
    bitoffset = 4
    currbyte = 1
    
    chspecs = []
    for ch in range(CHANNELS_NO):
        needed = CHANNEL_SIZE
        spec = []
        while needed > 0:
            available = BYTE_SIZE - bitoffset
            if needed > available:
                mask = 2**available - 1
                needed -= available
                lshift = needed
                spec.append((currbyte, mask, lshift))
                currbyte += 1
                bitoffset = 1
            else:
                mask = 2**needed - 1
                rshift = available - needed
                bitoffset += needed
                needed = 0
                spec.append((currbyte, mask, rshift))
        chspecs.append(spec)
    return chspecs

def do_read(chspecs, data):
    chvals = []
    databytes = io.BytesIO(data).read()
    for spec in chspecs:
        chval = 0
        for currbyte, mask, lshift in spec[:-1]:
            chval += (databytes[currbyte] & mask) << lshift
        currbyte, mask, rshift = spec[-1]
        chval += (databytes[currbyte] >> rshift) & mask
        chvals.append(chval)
    return chvals

specs = set_specs()
data = b"\xA0\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F"
vals = do_read(specs, data)

这在我的 PC 上快了大约 170 倍。

重要说明:如果我理解正确,您的设备会发送开始标记和前 4 位数据,因此您(几乎)永远不会收到“\xA0”字节:如果我们坚持假设所有数据位都是“1”,您将收到的是“\xAF”。事实上,我们使用的示例数据的 len 是 46,而不是 45。因此,由于我不清楚这部分,我只是跳过它:我忽略字节 0,并开始读取 4 个较低有效位字节 1.