如何让construct.GreedyRange回馈一个Byte?
How to get construct.GreedyRange to give back a Byte?
好的,假设我的工作完全符合预期:
from enum import IntEnum
from contstruct import *
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
示例:
>>> buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
>>> SLIP.build(buffer)
b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff’
并且:
>>> from operator import eq
>>> all(map(eq, SLIP.parse(SLIP.build(buffer)), buffer))
True
现在我需要将 encode/decode 包装在另一个结构中:
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Renamed(SLIP, 'message'),
Const(Char.STOP, Byte)
)
构建完全符合预期:
>>> PROTOCOL.build(buffer)
b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc'
但是,解析,GreedyRange
消耗了 1 个字节:
>>> PROTOCOL.parse(b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc')
construct.core.StreamError: stream read less than specified amount, expected 1, found 0
如何让GreedyRange
回馈一个字节?
对于您的情况,您可以简单地重新排列 PROTOCOL
的字段并将 SLIP
放在末尾。
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Const(Char.STOP, Byte),
Renamed(SLIP, 'message')
)
这样GreedyRange
将不会消耗导致流解析错误的所有字节:construct.core.StreamError: stream read less than specified amount, expected 1, found 0
。
这是修改后的示例:
from construct import Byte, Const, FocusedSeq, GreedyRange, Mapping, Renamed, Select
from enum import IntEnum
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
slip_build = SLIP.build(buffer)
assert slip_build == b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
slip_parsed = SLIP.parse(b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff')
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Const(Char.STOP, Byte),
Renamed(SLIP, 'message')
)
protocol_build = PROTOCOL.build(buffer)
assert protocol_build == b'\xab\xbc\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
protocol_parsed = PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed
这个问题的解决方案是NullTerminated(..., term=STOP)
,它在内部缓冲底层流并在必要时返回。
PROTOCOL = FocusedSeq(
'message'
Const(Char.START, Byte),
message=NullTerminated(
# NOTE build consumes entire stream and appends STOP
# NOTE parse consumes steam until STOP and passes buffer to GreedyRange
GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
x=MAPPING # NOTE intentionally raises MappingError
),
Byte # NOTE fallback for MappingError
)
),
term=Byte.build(Char.STOP)
)
)
另一种方法是使用 construct Adapter class 修改字节序列。
这是另一个代码示例:
from construct import Byte, Const, FocusedSeq, GreedyRange, \
If, Mapping, Renamed, Select, this, Adapter
from enum import IntEnum
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
slip_build = SLIP.build(buffer)
assert slip_build == b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
slip_parsed = SLIP.parse(b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff')
class ProtocolAdapter(Adapter):
def _decode(self, obj, context, path):
# remove first and last bite
obj.pop(0)
obj.pop(-1)
return obj
def _encode(self, obj, context, path):
return obj
PROTOCOL = FocusedSeq(
"message",
If(this._building == True, Const(Char.START, Byte)),
"message" / SLIP,
If(this._building == True, Const(Char.STOP, Byte))
)
ADAPTED_PROTOCOL = ProtocolAdapter(PROTOCOL)
protocol_build = ADAPTED_PROTOCOL.build(buffer)
assert protocol_build == b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc'
protocol_parsed = ADAPTED_PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed
好的,假设我的工作完全符合预期:
from enum import IntEnum
from contstruct import *
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
示例:
>>> buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
>>> SLIP.build(buffer)
b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff’
并且:
>>> from operator import eq
>>> all(map(eq, SLIP.parse(SLIP.build(buffer)), buffer))
True
现在我需要将 encode/decode 包装在另一个结构中:
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Renamed(SLIP, 'message'),
Const(Char.STOP, Byte)
)
构建完全符合预期:
>>> PROTOCOL.build(buffer)
b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc'
但是,解析,GreedyRange
消耗了 1 个字节:
>>> PROTOCOL.parse(b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc')
construct.core.StreamError: stream read less than specified amount, expected 1, found 0
如何让GreedyRange
回馈一个字节?
对于您的情况,您可以简单地重新排列 PROTOCOL
的字段并将 SLIP
放在末尾。
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Const(Char.STOP, Byte),
Renamed(SLIP, 'message')
)
这样GreedyRange
将不会消耗导致流解析错误的所有字节:construct.core.StreamError: stream read less than specified amount, expected 1, found 0
。
这是修改后的示例:
from construct import Byte, Const, FocusedSeq, GreedyRange, Mapping, Renamed, Select
from enum import IntEnum
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
slip_build = SLIP.build(buffer)
assert slip_build == b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
slip_parsed = SLIP.parse(b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff')
PROTOCOL = FocusedSeq(
'message',
Const(Char.START, Byte),
Const(Char.STOP, Byte),
Renamed(SLIP, 'message')
)
protocol_build = PROTOCOL.build(buffer)
assert protocol_build == b'\xab\xbc\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
protocol_parsed = PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed
这个问题的解决方案是NullTerminated(..., term=STOP)
,它在内部缓冲底层流并在必要时返回。
PROTOCOL = FocusedSeq(
'message'
Const(Char.START, Byte),
message=NullTerminated(
# NOTE build consumes entire stream and appends STOP
# NOTE parse consumes steam until STOP and passes buffer to GreedyRange
GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
x=MAPPING # NOTE intentionally raises MappingError
),
Byte # NOTE fallback for MappingError
)
),
term=Byte.build(Char.STOP)
)
)
另一种方法是使用 construct Adapter class 修改字节序列。
这是另一个代码示例:
from construct import Byte, Const, FocusedSeq, GreedyRange, \
If, Mapping, Renamed, Select, this, Adapter
from enum import IntEnum
class Char(IntEnum):
START = 0xAB
STOP = 0xBC
ESC = 0xCD
MAPPING = Mapping(Byte, {x: x+1 for x in Char})
SLIP = GreedyRange(
Select(
FocusedSeq(
'x',
Const(Char.ESC, Byte),
Renamed(MAPPING, 'x')
),
Byte
)
)
buffer = bytes([0x00, 0xAB, 0xBC, 0xCD, 0xFF])
slip_build = SLIP.build(buffer)
assert slip_build == b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff'
slip_parsed = SLIP.parse(b'\x00\xcd\xac\xcd\xbd\xcd\xce\xff')
class ProtocolAdapter(Adapter):
def _decode(self, obj, context, path):
# remove first and last bite
obj.pop(0)
obj.pop(-1)
return obj
def _encode(self, obj, context, path):
return obj
PROTOCOL = FocusedSeq(
"message",
If(this._building == True, Const(Char.START, Byte)),
"message" / SLIP,
If(this._building == True, Const(Char.STOP, Byte))
)
ADAPTED_PROTOCOL = ProtocolAdapter(PROTOCOL)
protocol_build = ADAPTED_PROTOCOL.build(buffer)
assert protocol_build == b'\xab\x00\xcd\xac\xcd\xbd\xcd\xce\xff\xbc'
protocol_parsed = ADAPTED_PROTOCOL.parse(protocol_build)
assert protocol_parsed == slip_parsed