从 Python 中的数据流中提取携带信息的 12 位
Extract 12 bits that carry information from a data stream in Python
我正在研究EEG(时间序列采集设备)串行驱动的实现。该设备使用 12 位 x 26 个总通道对数据进行编码,采样率为 200Hz
串行数据流由信号字节 0xA0 和 45 个字节组成,这些字节携带 26 个通道的数据,每个通道用 12 位编码。
但这里有一个问题,这 12 位在 45 字节块中的位置并不固定。第一个 bye 仅使用 4 LSB,而其余 44 个 7 LSB。
为了更加说明这一点,我将尝试在下面以图形方式表示它。假设我们已经启动了放大器,它总是为所有通道提供 4095(用 12 位表示的最大 int 值)(所以我们的数据都是“1”),那么我们有这样的东西:
a0 0f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f a0 下一个样本...
这必须映射到值为 4095 的 int(1,...,26)。
所以,我做了一个 python 代码,首先找到块的开头,然后将所有内容保存在一个 int/long 中,然后我删除了固定位置的位,最多追加 8有效的 0 位以构成 16 位表示并将字节数组转换为整数列表。
可以正常工作,但问题是速度。似乎代码为单个样本花费了相当多的时间,并且它必须在一秒钟内完成 200 次。让我们包括一些真正的串行读取方法的其他延迟,对于所有 200 个样本,一切都必须保持在 1 秒以下
#Python code
def readByte():
#mockup
return 0xA0
def read45bytes():
return int(0x0f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f)
def remove_bit(num, i):
mask = num >> (i + 1)
mask = mask << i
right = ((1 << i) - 1) & num
return mask | right
def insert_mult_bits(num, bits, len, i):
mask = num >> i
mask = (mask << len) | bits
mask = mask << i
right = ((1 << i) - 1) & num
return right | mask
def main():
while(readByte()!=0xA0):
print("Searching for the beginning of the packet of 45 bytes...")
print("Beginning of the packet of 45 bytes found")
#read whole sample
sample=read45bytes()
#remove unused bits
corr=0;
for i in range(7, sample.bit_length(), 8):
sample=remove_bit(sample,i-corr);
corr=corr+1;
#add HSB to make 2byte representation
corr=0;
for i in range(12,sample.bit_length(),12):
sample=insert_mult_bits(sample,0,4,i+corr)
corr=corr+4;
#convert to bytes 26channels x 2 bytes, bigendian
bt=sample.to_bytes(26*2,'big');
#assign the result to int list
idx=0;
out=[];
for i in range(0,26*2-1,2):
out.append(int(int((bt[i]<<8 | bt[i+1]))))
idx=idx+1;
#print first sample of the channel 1
print(out.pop(0))
code00.py:
#!/usr/bin/env python
import sys
import math
import io
import itertools as it
START_MARKER = b"\xA0"
START_MARKER_LEN = len(START_MARKER)
BIT_VALUE_MASK = list(2 ** i for i in range(7, -1, -1))
IGNORED_BITS_INDEXES = (7,)
def chunk_size(channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
libi = len(ignored_bits_indexes)
#if libi > 7:
# raise ValueError
bits = channel_count * bits_per_channel
bpb = 8 - libi
q, r = divmod(bits, bpb)
r += ignored_heading_bits
return q + math.ceil(r / 8)
def byte_2_bits(byte):
return [1 if (byte & i) else 0 for i in BIT_VALUE_MASK]
def bits_2_val(bits):
return sum(2 ** idx if bit == 1 else 0 for idx, bit in enumerate(bits[::-1]))
def decode_chunk(chunk, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
bit_lists = [reversed(byte_2_bits(b)) for b in chunk[::-1]]
bits = list(it.chain(*bit_lists))
channels = []
cur_chan_bits = []
for idx, bit in enumerate(bits[:-ignored_heading_bits]):
if idx % 8 in ignored_bits_indexes:
continue
cur_chan_bits.append(bit)
if len(cur_chan_bits) == bits_per_channel:
channels.append(bits_2_val(cur_chan_bits[::-1]))
cur_chan_bits = []
if cur_chan_bits:
raise ValueError("Something went wrong while decoding: ", cur_chan_bits)
return channels[::-1]
def read_data(stream, channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
while 1:
t = stream.read(START_MARKER_LEN)
if not t:
break
if t != START_MARKER:
continue
print("Start marker...")
size = chunk_size(channel_count=channel_count, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
chunk = stream.read(size)
if len(chunk) == size:
decoded = decode_chunk(chunk, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
print("Decoded: {:}\n".format(decoded))
print("End of data.")
def main(*argv):
# 1st chunk is the one in the question, I played a bit with next ones
b = START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7E" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x60\x01" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x5F\x7F" \
+ START_MARKER + b"\x00\x00\x3F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F"
read_data(io.BytesIO(b))
if __name__ == "__main__":
print("Python {:s} {:03d}bit on {:s}\n".format(" ".join(elem.strip() for elem in sys.version.split("\n")),
64 if sys.maxsize > 0x100000000 else 32, sys.platform))
rc = main(*sys.argv[1:])
print("\nDone.")
sys.exit(rc)
备注:
这种方法(几乎)不使用位操作,而是将数字(字节)中的位处理为数字列表(可能值:0、1)
解码(在完整的数据块上):
反转:
块中的所有字节
每个字节中的所有位
以相反的顺序获取块位
遍历bit列表(跳过每个字节的第7th位),当遇到12位时,将它们倒序转换(以“撤销” " bit 从 #1.2.) 反转为添加到频道列表的频道值
Return 倒序的频道列表(“撤消” 字节 从 反转#1.1.)
使用了一些效用函数(我猜很简单)
可以添加更好的错误处理
输出:
py_pc064_03_08_test0) [cfati@cfati-5510-0:/mnt/e/Work/Dev/Whosebug/q069660629]> python code00.py
Python 3.8.10 (default, Sep 28 2021, 16:10:42) [GCC 9.3.0] 064bit on linux
Start marker...
Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095]
Start marker...
Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094]
Start marker...
Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 1]
Start marker...
Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094, 4095]
Start marker...
Decoded: [0, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095]
End of data.
Done.
CristiFati 的解决方案很聪明,而且效果很好。然而,我发现乍一看有点难以理解(当然是我的错),尽管对于用例来说速度足够快,但它并没有达到应有的速度。所以我试图找到更简单、更快的东西。
我的推理是:一旦找到起始标记,我们就可以预先计算每个通道的数据位的位置。例如,通道 0 将使用字节 0 的位 4-7、字节 1 的位 1-7 和字节 2 的位 1;通道 1 将使用字节 2 的位 2-7 和字节 3 的位 1-6;等等。请注意,由于数据长度为 12 位,每个通道将跨越 2 或 3 个字节。
以通道 1 为例:我们使用第一个字节的 4 位,因此我们将其屏蔽为 2**4-1
,然后我们需要将其乘以 2**8
,因为我们还有其他 8 位要考虑;然后对于第二个字节,我们使用 7 位,因此我们将其屏蔽为 2**7-1
,并将其乘以 2**1
。最后一个字节不同:我们首先将它除以 2**6
以清除属于下一个通道的位,然后将其屏蔽为 2**1-1
.
因此对于每个通道,我们可以存储一个记录字节位置的元组,select 相关位所需的位掩码,以及我们需要移动的位数:最后一个向右移动通道的字节,在前一个字节的左侧。
重要的一点是,这个设置部分只需执行一次。之后我们可以读取我们的块并应用我们需要的计算:
import io
MARK = b"\xA0"
def set_specs():
CHANNELS_NO = 26
CHANNEL_SIZE = 12
BYTE_SIZE = 8
MARX = b"\xA0"
bitoffset = 4
currbyte = 1
chspecs = []
for ch in range(CHANNELS_NO):
needed = CHANNEL_SIZE
spec = []
while needed > 0:
available = BYTE_SIZE - bitoffset
if needed > available:
mask = 2**available - 1
needed -= available
lshift = needed
spec.append((currbyte, mask, lshift))
currbyte += 1
bitoffset = 1
else:
mask = 2**needed - 1
rshift = available - needed
bitoffset += needed
needed = 0
spec.append((currbyte, mask, rshift))
chspecs.append(spec)
return chspecs
def do_read(chspecs, data):
chvals = []
databytes = io.BytesIO(data).read()
for spec in chspecs:
chval = 0
for currbyte, mask, lshift in spec[:-1]:
chval += (databytes[currbyte] & mask) << lshift
currbyte, mask, rshift = spec[-1]
chval += (databytes[currbyte] >> rshift) & mask
chvals.append(chval)
return chvals
specs = set_specs()
data = b"\xA0\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F"
vals = do_read(specs, data)
这在我的 PC 上快了大约 170 倍。
重要说明:如果我理解正确,您的设备会发送开始标记和前 4 位数据,因此您(几乎)永远不会收到“\xA0”字节:如果我们坚持假设所有数据位都是“1”,您将收到的是“\xAF”。事实上,我们使用的示例数据的 len
是 46,而不是 45。因此,由于我不清楚这部分,我只是跳过它:我忽略字节 0,并开始读取 4 个较低有效位字节 1.
我正在研究EEG(时间序列采集设备)串行驱动的实现。该设备使用 12 位 x 26 个总通道对数据进行编码,采样率为 200Hz
串行数据流由信号字节 0xA0 和 45 个字节组成,这些字节携带 26 个通道的数据,每个通道用 12 位编码。
但这里有一个问题,这 12 位在 45 字节块中的位置并不固定。第一个 bye 仅使用 4 LSB,而其余 44 个 7 LSB。
为了更加说明这一点,我将尝试在下面以图形方式表示它。假设我们已经启动了放大器,它总是为所有通道提供 4095(用 12 位表示的最大 int 值)(所以我们的数据都是“1”),那么我们有这样的东西:
a0 0f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f 7f a0 下一个样本...
这必须映射到值为 4095 的 int(1,...,26)。
所以,我做了一个 python 代码,首先找到块的开头,然后将所有内容保存在一个 int/long 中,然后我删除了固定位置的位,最多追加 8有效的 0 位以构成 16 位表示并将字节数组转换为整数列表。
可以正常工作,但问题是速度。似乎代码为单个样本花费了相当多的时间,并且它必须在一秒钟内完成 200 次。让我们包括一些真正的串行读取方法的其他延迟,对于所有 200 个样本,一切都必须保持在 1 秒以下
#Python code
def readByte():
#mockup
return 0xA0
def read45bytes():
return int(0x0f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f7f)
def remove_bit(num, i):
mask = num >> (i + 1)
mask = mask << i
right = ((1 << i) - 1) & num
return mask | right
def insert_mult_bits(num, bits, len, i):
mask = num >> i
mask = (mask << len) | bits
mask = mask << i
right = ((1 << i) - 1) & num
return right | mask
def main():
while(readByte()!=0xA0):
print("Searching for the beginning of the packet of 45 bytes...")
print("Beginning of the packet of 45 bytes found")
#read whole sample
sample=read45bytes()
#remove unused bits
corr=0;
for i in range(7, sample.bit_length(), 8):
sample=remove_bit(sample,i-corr);
corr=corr+1;
#add HSB to make 2byte representation
corr=0;
for i in range(12,sample.bit_length(),12):
sample=insert_mult_bits(sample,0,4,i+corr)
corr=corr+4;
#convert to bytes 26channels x 2 bytes, bigendian
bt=sample.to_bytes(26*2,'big');
#assign the result to int list
idx=0;
out=[];
for i in range(0,26*2-1,2):
out.append(int(int((bt[i]<<8 | bt[i+1]))))
idx=idx+1;
#print first sample of the channel 1
print(out.pop(0))
code00.py:
#!/usr/bin/env python
import sys
import math
import io
import itertools as it
START_MARKER = b"\xA0"
START_MARKER_LEN = len(START_MARKER)
BIT_VALUE_MASK = list(2 ** i for i in range(7, -1, -1))
IGNORED_BITS_INDEXES = (7,)
def chunk_size(channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
libi = len(ignored_bits_indexes)
#if libi > 7:
# raise ValueError
bits = channel_count * bits_per_channel
bpb = 8 - libi
q, r = divmod(bits, bpb)
r += ignored_heading_bits
return q + math.ceil(r / 8)
def byte_2_bits(byte):
return [1 if (byte & i) else 0 for i in BIT_VALUE_MASK]
def bits_2_val(bits):
return sum(2 ** idx if bit == 1 else 0 for idx, bit in enumerate(bits[::-1]))
def decode_chunk(chunk, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
bit_lists = [reversed(byte_2_bits(b)) for b in chunk[::-1]]
bits = list(it.chain(*bit_lists))
channels = []
cur_chan_bits = []
for idx, bit in enumerate(bits[:-ignored_heading_bits]):
if idx % 8 in ignored_bits_indexes:
continue
cur_chan_bits.append(bit)
if len(cur_chan_bits) == bits_per_channel:
channels.append(bits_2_val(cur_chan_bits[::-1]))
cur_chan_bits = []
if cur_chan_bits:
raise ValueError("Something went wrong while decoding: ", cur_chan_bits)
return channels[::-1]
def read_data(stream, channel_count=26, bits_per_channel=12, ignored_bits_indexes=IGNORED_BITS_INDEXES, ignored_heading_bits=4):
while 1:
t = stream.read(START_MARKER_LEN)
if not t:
break
if t != START_MARKER:
continue
print("Start marker...")
size = chunk_size(channel_count=channel_count, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
chunk = stream.read(size)
if len(chunk) == size:
decoded = decode_chunk(chunk, bits_per_channel=bits_per_channel, ignored_bits_indexes=ignored_bits_indexes, ignored_heading_bits=ignored_heading_bits)
print("Decoded: {:}\n".format(decoded))
print("End of data.")
def main(*argv):
# 1st chunk is the one in the question, I played a bit with next ones
b = START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7E" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x60\x01" \
+ START_MARKER + b"\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x5F\x7F" \
+ START_MARKER + b"\x00\x00\x3F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F"
read_data(io.BytesIO(b))
if __name__ == "__main__":
print("Python {:s} {:03d}bit on {:s}\n".format(" ".join(elem.strip() for elem in sys.version.split("\n")),
64 if sys.maxsize > 0x100000000 else 32, sys.platform))
rc = main(*sys.argv[1:])
print("\nDone.")
sys.exit(rc)
备注:
这种方法(几乎)不使用位操作,而是将数字(字节)中的位处理为数字列表(可能值:0、1)
解码(在完整的数据块上):
反转:
块中的所有字节
每个字节中的所有位
以相反的顺序获取块位
遍历bit列表(跳过每个字节的第7th位),当遇到12位时,将它们倒序转换(以“撤销” " bit 从 #1.2.) 反转为添加到频道列表的频道值
Return 倒序的频道列表(“撤消” 字节 从 反转#1.1.)
使用了一些效用函数(我猜很简单)
可以添加更好的错误处理
输出:
py_pc064_03_08_test0) [cfati@cfati-5510-0:/mnt/e/Work/Dev/Whosebug/q069660629]> python code00.py Python 3.8.10 (default, Sep 28 2021, 16:10:42) [GCC 9.3.0] 064bit on linux Start marker... Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095] Start marker... Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094] Start marker... Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 1] Start marker... Decoded: [4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4094, 4095] Start marker... Decoded: [0, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095, 4095] End of data. Done.
CristiFati 的解决方案很聪明,而且效果很好。然而,我发现乍一看有点难以理解(当然是我的错),尽管对于用例来说速度足够快,但它并没有达到应有的速度。所以我试图找到更简单、更快的东西。
我的推理是:一旦找到起始标记,我们就可以预先计算每个通道的数据位的位置。例如,通道 0 将使用字节 0 的位 4-7、字节 1 的位 1-7 和字节 2 的位 1;通道 1 将使用字节 2 的位 2-7 和字节 3 的位 1-6;等等。请注意,由于数据长度为 12 位,每个通道将跨越 2 或 3 个字节。
以通道 1 为例:我们使用第一个字节的 4 位,因此我们将其屏蔽为 2**4-1
,然后我们需要将其乘以 2**8
,因为我们还有其他 8 位要考虑;然后对于第二个字节,我们使用 7 位,因此我们将其屏蔽为 2**7-1
,并将其乘以 2**1
。最后一个字节不同:我们首先将它除以 2**6
以清除属于下一个通道的位,然后将其屏蔽为 2**1-1
.
因此对于每个通道,我们可以存储一个记录字节位置的元组,select 相关位所需的位掩码,以及我们需要移动的位数:最后一个向右移动通道的字节,在前一个字节的左侧。
重要的一点是,这个设置部分只需执行一次。之后我们可以读取我们的块并应用我们需要的计算:
import io
MARK = b"\xA0"
def set_specs():
CHANNELS_NO = 26
CHANNEL_SIZE = 12
BYTE_SIZE = 8
MARX = b"\xA0"
bitoffset = 4
currbyte = 1
chspecs = []
for ch in range(CHANNELS_NO):
needed = CHANNEL_SIZE
spec = []
while needed > 0:
available = BYTE_SIZE - bitoffset
if needed > available:
mask = 2**available - 1
needed -= available
lshift = needed
spec.append((currbyte, mask, lshift))
currbyte += 1
bitoffset = 1
else:
mask = 2**needed - 1
rshift = available - needed
bitoffset += needed
needed = 0
spec.append((currbyte, mask, rshift))
chspecs.append(spec)
return chspecs
def do_read(chspecs, data):
chvals = []
databytes = io.BytesIO(data).read()
for spec in chspecs:
chval = 0
for currbyte, mask, lshift in spec[:-1]:
chval += (databytes[currbyte] & mask) << lshift
currbyte, mask, rshift = spec[-1]
chval += (databytes[currbyte] >> rshift) & mask
chvals.append(chval)
return chvals
specs = set_specs()
data = b"\xA0\x0F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F\x7F"
vals = do_read(specs, data)
这在我的 PC 上快了大约 170 倍。
重要说明:如果我理解正确,您的设备会发送开始标记和前 4 位数据,因此您(几乎)永远不会收到“\xA0”字节:如果我们坚持假设所有数据位都是“1”,您将收到的是“\xAF”。事实上,我们使用的示例数据的 len
是 46,而不是 45。因此,由于我不清楚这部分,我只是跳过它:我忽略字节 0,并开始读取 4 个较低有效位字节 1.