如何在 python 中加速转换二进制文件?
how to speed up converting binary file in python?
我有二进制文件,我想读取然后写入 python 中的另一个文件(CSV 或 pickle)。
我有一个解决方案,但需要很长时间。
二进制文件由多个数据集组成。数据集有 1 header(4 字节),1 个序列数据(4 字节),100 条消息(ID:2 字节,data:8 字节)。
aa aa aa aa c8 05 00 00 51 02 15 04 ca 8c 00 10
28 80 94 03 00 20 00 00 ff 83 23 98 b0 02 a2 ff
00 07 5a 75 00 00 11 01 00 80 00 ff 4f 2c 0d 84
12 01 ff 50 00 00 ff 2c 0d 00 20 02 0f a4 7e 00
00 fb 0f 12 60 02 06 11 07 30 45 c8 69 20 16 03
05 11 9a 0d 11 0e 00 7f 29 03 d6 9a 81 8c 31 28
00 10 51 02 14 04 cb 50 00 0f 08 80 b0 02 a2 ff
00 07 4b a5 00 00 11 01 00 80 00 ff 4f 25 0d b8
12 01 ff a0 00 00 ff 25 0d 00 20 02 12 c4 7e 00 ...
这是我的文件示例。
为了解析二进制文件,我是这样编码的
def parse(self, bindata):
msg_list = defaultdict(list)
with memoryview(bindata) as mv:
old_seq = None
while mv:
# header
header = mv[:HEADER_SIZE].tobytes()
mv = mv[HEADER_SIZE:]
if (header != HEADER):
logging.error("invalid header")
break
# seq
seq = int.from_bytes(
mv[:SEQUENCE_SIZE].tobytes(), byteorder='little')
mv = mv[SEQUENCE_SIZE:]
if (old_seq and seq - old_seq != 1):
logging.warning(
"sequence error. old=%d / current=%d", old_seq, seq)
old_seq = seq
# msg
for msg_cnt in range(0, MSG_COUNT):
if not mv or mv[:HEADER_SIZE] == HEADER:
break
id = int.from_bytes(
mv[:MSG_ID_SIZE].tobytes(), byteorder='little')
mv = mv[MSG_ID_SIZE:]
msg = None
try:
msg = self.__db.get_message_by_frame_id(id)
except KeyError as e:
logging.exception("unknown can id. %s", hex(id))
mv = mv[MSG_DATA_SIZE:]
continue
body = mv[:MSG_DATA_SIZE].tobytes()
mv = mv[MSG_DATA_SIZE:]
try:
decoded_msg = msg.decode(
body, decode_choices=False)
for k, v in decoded_msg.items():
msg_list[k].append(v)
except Exception as e:
logging.exception("unpack error. %s", str(e))
return msg_list
这段代码似乎是time-consuming,因为它按顺序访问数据。
所以我想知道另一种方法。
我能得到更好的推荐吗?
您需要 struct
模块。那么,每个块都是 1008 字节?
for i in range(0,len(mv),1008):
hdr,seq = struct.unpack('II', mv[i:i+8] )
for msg in range( 8, 1008, 100 ):
id = mv[i+msg] * 256 + mv[i+msg+1]
code = mv[i+msg+2:i+msg+10]
我有二进制文件,我想读取然后写入 python 中的另一个文件(CSV 或 pickle)。
我有一个解决方案,但需要很长时间。
二进制文件由多个数据集组成。数据集有 1 header(4 字节),1 个序列数据(4 字节),100 条消息(ID:2 字节,data:8 字节)。
aa aa aa aa c8 05 00 00 51 02 15 04 ca 8c 00 10
28 80 94 03 00 20 00 00 ff 83 23 98 b0 02 a2 ff
00 07 5a 75 00 00 11 01 00 80 00 ff 4f 2c 0d 84
12 01 ff 50 00 00 ff 2c 0d 00 20 02 0f a4 7e 00
00 fb 0f 12 60 02 06 11 07 30 45 c8 69 20 16 03
05 11 9a 0d 11 0e 00 7f 29 03 d6 9a 81 8c 31 28
00 10 51 02 14 04 cb 50 00 0f 08 80 b0 02 a2 ff
00 07 4b a5 00 00 11 01 00 80 00 ff 4f 25 0d b8
12 01 ff a0 00 00 ff 25 0d 00 20 02 12 c4 7e 00 ...
这是我的文件示例。
为了解析二进制文件,我是这样编码的
def parse(self, bindata):
msg_list = defaultdict(list)
with memoryview(bindata) as mv:
old_seq = None
while mv:
# header
header = mv[:HEADER_SIZE].tobytes()
mv = mv[HEADER_SIZE:]
if (header != HEADER):
logging.error("invalid header")
break
# seq
seq = int.from_bytes(
mv[:SEQUENCE_SIZE].tobytes(), byteorder='little')
mv = mv[SEQUENCE_SIZE:]
if (old_seq and seq - old_seq != 1):
logging.warning(
"sequence error. old=%d / current=%d", old_seq, seq)
old_seq = seq
# msg
for msg_cnt in range(0, MSG_COUNT):
if not mv or mv[:HEADER_SIZE] == HEADER:
break
id = int.from_bytes(
mv[:MSG_ID_SIZE].tobytes(), byteorder='little')
mv = mv[MSG_ID_SIZE:]
msg = None
try:
msg = self.__db.get_message_by_frame_id(id)
except KeyError as e:
logging.exception("unknown can id. %s", hex(id))
mv = mv[MSG_DATA_SIZE:]
continue
body = mv[:MSG_DATA_SIZE].tobytes()
mv = mv[MSG_DATA_SIZE:]
try:
decoded_msg = msg.decode(
body, decode_choices=False)
for k, v in decoded_msg.items():
msg_list[k].append(v)
except Exception as e:
logging.exception("unpack error. %s", str(e))
return msg_list
这段代码似乎是time-consuming,因为它按顺序访问数据。
所以我想知道另一种方法。
我能得到更好的推荐吗?
您需要 struct
模块。那么,每个块都是 1008 字节?
for i in range(0,len(mv),1008):
hdr,seq = struct.unpack('II', mv[i:i+8] )
for msg in range( 8, 1008, 100 ):
id = mv[i+msg] * 256 + mv[i+msg+1]
code = mv[i+msg+2:i+msg+10]