Python: ctypes hashable c_char 数组替换而不被'\0'字节绊倒
Python: ctypes hashable c_char array replacement without tripping over '\0' bytes
出于说明目的,此脚本创建了一个文件 mapfile
,其中包含文件的内容,作为参数给出,前面加上带有 sha1 校验和的二进制 header,允许在后续运行。
这里需要的是一个可散列的 ctypes.c_char
替代品,它可以用最少的模糊来保存 sha1 校验和,但不会阻塞 '\0' 个字节。
# -*- coding: utf8 -*
import io
import mmap
import ctypes
import hashlib
import logging
from collections import OrderedDict
log = logging.getLogger(__file__)
def align(size, alignment):
"""return size aligned to alignment"""
excess = size % alignment
if excess:
size = size - excess + alignment
return size
class Header(ctypes.Structure):
Identifier = b'HEAD'
_fields_ = [
('id', ctypes.c_char * 4),
('hlen', ctypes.c_uint16),
('plen', ctypes.c_uint32),
('name', ctypes.c_char * 128),
('sha1', ctypes.c_char * 20),
]
HeaderSize = ctypes.sizeof(Header)
class CtsMap:
def __init__(self, ctcls, mm, offset = 0):
self.ctcls = ctcls
self.mm = mm
self.offset = offset
def __enter__(self):
mm = self.mm
offset = self.offset
ctsize = ctypes.sizeof(self.ctcls)
if offset + ctsize > mm.size():
newsize = align(offset + ctsize, mmap.PAGESIZE)
mm.resize(newsize)
self.ctinst = self.ctcls.from_buffer(mm, offset)
return self.ctinst
def __exit__(self, exc_type, exc_value, exc_traceback):
del self.ctinst
self.ctinst = None
class MapFile:
def __init__(self, filename):
try:
# try to create initial file
mapsize = mmap.PAGESIZE
self._fd = open(filename, 'x+b')
self._fd.write(b'[=12=]' * mapsize)
except FileExistsError:
# file exists and is writable
self._fd = open(filename, 'r+b')
self._fd.seek(0, io.SEEK_END)
mapsize = self._fd.tell()
# mmap this file completely
self._fd.seek(0)
self._mm = mmap.mmap(self._fd.fileno(), mapsize)
self._offset = 0
self._toc = OrderedDict()
self.gen_toc()
def gen_toc(self):
while self._offset < self._mm.size():
with CtsMap(Header, self._mm, self._offset) as hd:
if hd.id == Header.Identifier and hd.hlen == HeaderSize:
self._toc[hd.sha1] = self._offset
log.debug('toc: [%s]%s: %s', len(hd.sha1), hd.sha1, self._offset)
self._offset += HeaderSize + hd.plen
else:
break
del hd
def add_data(self, datafile, data):
datasize = len(data)
sha1 = hashlib.sha1()
sha1.update(data)
digest = sha1.digest()
if digest in self._toc:
log.debug('add_data: %s added already', digest)
return None
log.debug('add_data: %s, %s bytes, %s', datafile, datasize, digest)
with CtsMap(Header, self._mm, self._offset) as hd:
hd.id = Header.Identifier
hd.hlen = HeaderSize
hd.plen = datasize
hd.name = datafile
hd.sha1 = digest
del hd
self._offset += HeaderSize
log.debug('add_data: %s', datasize)
blktype = ctypes.c_char * datasize
with CtsMap(blktype, self._mm, self._offset) as blk:
blk.raw = data
del blk
self._offset += datasize
return HeaderSize + datasize
def close(self):
self._mm.close()
self._fd.close()
if __name__ == '__main__':
import os
import sys
logconfig = dict(
level = logging.DEBUG,
format = '%(levelname)5s: %(message)s',
)
logging.basicConfig(**logconfig)
mf = MapFile('mapfile')
for datafile in sys.argv[1:]:
if os.path.isfile(datafile):
try:
data = open(datafile, 'rb').read()
except OSError:
continue
else:
mf.add_data(datafile.encode('utf-8'), data)
mf.close()
运行: python3 hashable_ctypes_bytes.py somefiles*
第二次调用它时,它会使用 sha1 摘要作为键来读取有序字典中收集所有项目的文件。不幸的是,c_char 数组语义有点连线,因为它 也 表现得像 '\0' 终止的 c 字符串,导致这里的校验和被截断。
参见第 3 行和第 4 行:
DEBUG: toc: [20]b'\xcd0\xd7\xd3\xbf\x9f\xe1\xfe\xffr\xa6g#\xee\xf8\x84\xb5S,u': 0
DEBUG: toc: [20]b'\xe9\xfe\x1a;i\xcdG0\x84\x1b\r\x7f\xf9\x14\x868\xbdVl\x8d': 1273
DEBUG: toc: [19]b'\xa2\xdb\xff$&\xfe\x0f\xb4\xcaB<F\x92\xc0\xf1`(\x96N': 3642
DEBUG: toc: [15]b'O\x1b~c\x82\xeb)\x8f\xb5\x9c\x15\xd5e:\xa9': 4650
DEBUG: toc: [20]b'\x80\xe9\xbcF\x97\xdc\x93DG\x90\x19\x8c\xca\xfep\x05\xbdM\xfby': 13841
DEBUG: add_data: b'\xcd0\xd7\xd3\xbf\x9f\xe1\xfe\xffr\xa6g#\xee\xf8\x84\xb5S,u' added already
DEBUG: add_data: b'\xe9\xfe\x1a;i\xcdG0\x84\x1b\r\x7f\xf9\x14\x868\xbdVl\x8d' added already
DEBUG: add_data: b'../python/tmp/colorselect.py', 848 bytes, b'\xa2\xdb\xff$&\xfe\x0f\xb4\xcaB<F\x92\xc0\xf1`(\x96N\x00'
DEBUG: add_data: 848
DEBUG: add_data: b'../python/tmp/DemoCode.py', 9031 bytes, b'O\x1b~c\x82\xeb)\x8f\xb5\x9c\x15\xd5e:\xa9\x00p\x0f\xc04'
DEBUG: add_data: 9031
DEBUG: add_data: b'\x80\xe9\xbcF\x97\xdc\x93DG\x90\x19\x8c\xca\xfep\x05\xbdM\xfby' added already
通常的建议是将 c_char * 20 替换为 c_byte * 20,并以这种方式取消透明字节处理。除了数据转换麻烦之外,c_byte 数组由于是字节数组而不可哈希。我还没有找到一个实用的解决方案,而不需要来回经历大量的转换麻烦,或者求助于 hexdigests,这会使 sha1 摘要大小要求加倍。
我认为,c_char 将其与 C 零终止语义混合的设计决定首先是一个错误。为了解决这个问题,我可以想象向 ctypes 添加一个 c_char_nz 类型,这样就可以解决这个问题。
对于那些仔细阅读代码的人,您可能想知道 ctypes 结构的 del 语句。可以找到关于它的讨论 here:.
虽然下面的代码正在执行您提到的来回转换,但它确实很好地隐藏了问题。我找到了一个包含 null 的散列,该字段现在可以用作字典键。希望对你有帮助。
from ctypes import *
import hashlib
class Test(Structure):
_fields_ = [('_sha1',c_ubyte * 20)]
@property
def sha1(self):
return bytes(self._sha1)
@sha1.setter
def sha1(self, value):
self._sha1 = (c_ubyte * 20)(*value)
test = Test()
test.sha1 = hashlib.sha1(b'aaaaaaaaaaa').digest()
D = {test.sha1:0}
print(D)
输出:
{b'u\\x00\x1fJ\xe3\xc8\x84>ZP\xddj\xa2\xfa#\x89=\xd3\xad': 0}
出于说明目的,此脚本创建了一个文件 mapfile
,其中包含文件的内容,作为参数给出,前面加上带有 sha1 校验和的二进制 header,允许在后续运行。
这里需要的是一个可散列的 ctypes.c_char
替代品,它可以用最少的模糊来保存 sha1 校验和,但不会阻塞 '\0' 个字节。
# -*- coding: utf8 -*
import io
import mmap
import ctypes
import hashlib
import logging
from collections import OrderedDict
log = logging.getLogger(__file__)
def align(size, alignment):
"""return size aligned to alignment"""
excess = size % alignment
if excess:
size = size - excess + alignment
return size
class Header(ctypes.Structure):
Identifier = b'HEAD'
_fields_ = [
('id', ctypes.c_char * 4),
('hlen', ctypes.c_uint16),
('plen', ctypes.c_uint32),
('name', ctypes.c_char * 128),
('sha1', ctypes.c_char * 20),
]
HeaderSize = ctypes.sizeof(Header)
class CtsMap:
def __init__(self, ctcls, mm, offset = 0):
self.ctcls = ctcls
self.mm = mm
self.offset = offset
def __enter__(self):
mm = self.mm
offset = self.offset
ctsize = ctypes.sizeof(self.ctcls)
if offset + ctsize > mm.size():
newsize = align(offset + ctsize, mmap.PAGESIZE)
mm.resize(newsize)
self.ctinst = self.ctcls.from_buffer(mm, offset)
return self.ctinst
def __exit__(self, exc_type, exc_value, exc_traceback):
del self.ctinst
self.ctinst = None
class MapFile:
def __init__(self, filename):
try:
# try to create initial file
mapsize = mmap.PAGESIZE
self._fd = open(filename, 'x+b')
self._fd.write(b'[=12=]' * mapsize)
except FileExistsError:
# file exists and is writable
self._fd = open(filename, 'r+b')
self._fd.seek(0, io.SEEK_END)
mapsize = self._fd.tell()
# mmap this file completely
self._fd.seek(0)
self._mm = mmap.mmap(self._fd.fileno(), mapsize)
self._offset = 0
self._toc = OrderedDict()
self.gen_toc()
def gen_toc(self):
while self._offset < self._mm.size():
with CtsMap(Header, self._mm, self._offset) as hd:
if hd.id == Header.Identifier and hd.hlen == HeaderSize:
self._toc[hd.sha1] = self._offset
log.debug('toc: [%s]%s: %s', len(hd.sha1), hd.sha1, self._offset)
self._offset += HeaderSize + hd.plen
else:
break
del hd
def add_data(self, datafile, data):
datasize = len(data)
sha1 = hashlib.sha1()
sha1.update(data)
digest = sha1.digest()
if digest in self._toc:
log.debug('add_data: %s added already', digest)
return None
log.debug('add_data: %s, %s bytes, %s', datafile, datasize, digest)
with CtsMap(Header, self._mm, self._offset) as hd:
hd.id = Header.Identifier
hd.hlen = HeaderSize
hd.plen = datasize
hd.name = datafile
hd.sha1 = digest
del hd
self._offset += HeaderSize
log.debug('add_data: %s', datasize)
blktype = ctypes.c_char * datasize
with CtsMap(blktype, self._mm, self._offset) as blk:
blk.raw = data
del blk
self._offset += datasize
return HeaderSize + datasize
def close(self):
self._mm.close()
self._fd.close()
if __name__ == '__main__':
import os
import sys
logconfig = dict(
level = logging.DEBUG,
format = '%(levelname)5s: %(message)s',
)
logging.basicConfig(**logconfig)
mf = MapFile('mapfile')
for datafile in sys.argv[1:]:
if os.path.isfile(datafile):
try:
data = open(datafile, 'rb').read()
except OSError:
continue
else:
mf.add_data(datafile.encode('utf-8'), data)
mf.close()
运行: python3 hashable_ctypes_bytes.py somefiles*
第二次调用它时,它会使用 sha1 摘要作为键来读取有序字典中收集所有项目的文件。不幸的是,c_char 数组语义有点连线,因为它 也 表现得像 '\0' 终止的 c 字符串,导致这里的校验和被截断。
参见第 3 行和第 4 行:
DEBUG: toc: [20]b'\xcd0\xd7\xd3\xbf\x9f\xe1\xfe\xffr\xa6g#\xee\xf8\x84\xb5S,u': 0
DEBUG: toc: [20]b'\xe9\xfe\x1a;i\xcdG0\x84\x1b\r\x7f\xf9\x14\x868\xbdVl\x8d': 1273
DEBUG: toc: [19]b'\xa2\xdb\xff$&\xfe\x0f\xb4\xcaB<F\x92\xc0\xf1`(\x96N': 3642
DEBUG: toc: [15]b'O\x1b~c\x82\xeb)\x8f\xb5\x9c\x15\xd5e:\xa9': 4650
DEBUG: toc: [20]b'\x80\xe9\xbcF\x97\xdc\x93DG\x90\x19\x8c\xca\xfep\x05\xbdM\xfby': 13841
DEBUG: add_data: b'\xcd0\xd7\xd3\xbf\x9f\xe1\xfe\xffr\xa6g#\xee\xf8\x84\xb5S,u' added already
DEBUG: add_data: b'\xe9\xfe\x1a;i\xcdG0\x84\x1b\r\x7f\xf9\x14\x868\xbdVl\x8d' added already
DEBUG: add_data: b'../python/tmp/colorselect.py', 848 bytes, b'\xa2\xdb\xff$&\xfe\x0f\xb4\xcaB<F\x92\xc0\xf1`(\x96N\x00'
DEBUG: add_data: 848
DEBUG: add_data: b'../python/tmp/DemoCode.py', 9031 bytes, b'O\x1b~c\x82\xeb)\x8f\xb5\x9c\x15\xd5e:\xa9\x00p\x0f\xc04'
DEBUG: add_data: 9031
DEBUG: add_data: b'\x80\xe9\xbcF\x97\xdc\x93DG\x90\x19\x8c\xca\xfep\x05\xbdM\xfby' added already
通常的建议是将 c_char * 20 替换为 c_byte * 20,并以这种方式取消透明字节处理。除了数据转换麻烦之外,c_byte 数组由于是字节数组而不可哈希。我还没有找到一个实用的解决方案,而不需要来回经历大量的转换麻烦,或者求助于 hexdigests,这会使 sha1 摘要大小要求加倍。
我认为,c_char 将其与 C 零终止语义混合的设计决定首先是一个错误。为了解决这个问题,我可以想象向 ctypes 添加一个 c_char_nz 类型,这样就可以解决这个问题。
对于那些仔细阅读代码的人,您可能想知道 ctypes 结构的 del 语句。可以找到关于它的讨论 here:.
虽然下面的代码正在执行您提到的来回转换,但它确实很好地隐藏了问题。我找到了一个包含 null 的散列,该字段现在可以用作字典键。希望对你有帮助。
from ctypes import *
import hashlib
class Test(Structure):
_fields_ = [('_sha1',c_ubyte * 20)]
@property
def sha1(self):
return bytes(self._sha1)
@sha1.setter
def sha1(self, value):
self._sha1 = (c_ubyte * 20)(*value)
test = Test()
test.sha1 = hashlib.sha1(b'aaaaaaaaaaa').digest()
D = {test.sha1:0}
print(D)
输出:
{b'u\\x00\x1fJ\xe3\xc8\x84>ZP\xddj\xa2\xfa#\x89=\xd3\xad': 0}