处理具有自定义帧结构的级联 TCP 流的扭曲协议?

Twisted protocol to handle concatenated TCP stream with custom frame structure?

我得到了一份构建长拉式 TCP 套接字服务器作为设备服务器的工作。我开发的时候选择了Twisted。它适用于我的 Python 设备模拟器。然而,真实设备发送连接(或组合)的 TCP 数据包。我知道这在实际网络和设备中是正常的,尽管 TCP 数据包很短。

它有三种框架结构:

\xFDAA + "realtime_data" + \xCCDD (length is fixed at 150B)

\xFDCC + "extra_data" + \xCCDD (length is fixed at 190B)

\xFDCC + "extra_data" + \xCCDD (length is fixed at 192B)

很明显,\xFDAA\xFDCC是header,\xCCDD是EOT。所以他们确实有束缚。而且它们还暗示了固定长度,没有在协议本身中定义。

但是,我不知道如何使用现有的 Twisted 方法处理自定义框架的串联数据包。在我的开发过程中,我使用了dataReceiver。

到目前为止,我正在尝试解析数据包并将其存储在协议工厂的缓冲区中。当每个新数据包到达时,我会将以前缓冲的数据与新数据组合起来进行解析(如果连接则合并,如果收到组合数据包则将它们分开......但这看起来很脏)。

我查看了 twistedmatrix.com 的常见问题解答。它推荐了以下解决方案:

LineReceiver (with \r\n ending chars) 
NetstringReceiver (with callback for every string received) 
Int8/16/32Receiver (with prefix length information)

然后还推荐使用 AMP 和 PB 高级消息传递。

我想听听 twisted 专家关于如何使用 twisted 官方实现它的任何建议。 URL/demo 代码非常有用。

None LineReceiver, NetstringReceiver, Int8/16/32Receiver, AMPPB 适用于您的问题,因为它们都是实现 特定的框架(在后两者的情况下,消息传递)协议。 相反,您有一个要实现的自定义协议。

幸运的是,这相对简单:Twisted 通过您的 IProtocol 实现的 dataReceived 方法。

处理这类事情的最好方法实际上是实现一个简单的 首先是功能,而不是担心它究竟是如何被插入的 进入扭曲。在您的情况下,您需要一个解析协议的函数; 但是,由于 dataReceived 可能会向您发送部分数据包,因此您需要 确保函数 returns 2 件事:解析的数据,以及任何剩余的 缓冲。一旦你有了这样的功能,你就可以将它插入到 Protocol subclass 很容易。

你对协议的解释不是很清楚,所以这可能不是很清楚 正确,但我将您对消息格式的描述解释为:

octet 0xFD
octet 0xAA
150 octets of "realtimeData"
octet 0xCC
octet 0xDD
octet 0xFD
octet 0xCC
190 octets of "extraData1"
octet 0xCC
octet 0xDD
octet 0xFD
octet 0xCC
192 octets of "extraData2"
octet 0xCC
octet 0xDD

也就是说,单个协议消息长度为544字节,包含3个 必须正确的字段和 12 个字节的填充。

所以让我们先写一个 Message class 来代表一条带有这些的消息 三个字段,使用标准库struct模块解析序列化 它的字段:

from struct import Struct

class Message(object):
    format = Struct(
        "!" # Network endian; always good form.
        "2s" # FD AA
        "150s" # realtimeData
        "4s" # CC DD FD CC
        "190s" # extra1
        "4s" # CC DD FD CC
        "192s" # extra2
        "2s" # CC DD
    )

    def __init__(self, realtimeData, extra1, extra2):
        self.realtimeData = realtimeData
        self.extra1 = extra1
        self.extra2 = extra2

    def toBytes(self):
        return self.format.pack(
            b"\xFD\xAA", self.realtimeData, b"\xCC\xDD\xFD\xCC", self.extra1,
            b"\xCC\xDD\xFD\xCC", self.extra2, b"\xCC\xDD"
        )

    @classmethod
    def fromBytes(cls, octets):
        [fdaa, realtimeData, ccddfdcc, extra1, ccddfdcc2, extra2,
         ccdd] = cls.format.unpack(octets)
        # verify message integrity
        assert fdaa == b"\xFD\xAA"
        assert ccddfdcc == b"\xCC\xDD\xFD\xCC"
        assert ccddfdcc2 == b"\xCC\xDD\xFD\xCC"
        assert ccdd == b"\xCC\xDD"
        return cls(realtimeData, extra1, extra2)

    @classmethod
    def parseStream(cls, streamBytes):
        sz = cls.format.size
        messages = []
        while len(streamBytes) >= sz:
            messageData, streamBytes = streamBytes[:sz], streamBytes[sz:]
            messages.append(cls.fromBytes(messageData))
        return messages, streamBytes

这里与 Twisted 接口的重要部分是最后一部分 parseStream 方法,将一堆字节变成一堆消息,以及 尚未解析的剩余字节。然后我们可以有一个 Protocol 理解实际的网络流,像这样:

from twisted.internet.protocol import Protocol
class MyProtocol(Protocol):
    buffer = b""
    def dataReceived(self, data):
        messages, self.buffer = Message.parseStream(self.buffer + data)
        for message in messages:
            self.messageReceived(message)
    def messageReceived(self, message):
        "do something useful with a message"

而不是调用 self.messageReceived,您可能希望在 self 的一些其他属性,或者可能将 Message 对象中继到 与此协议关联的工厂。由你决定!既然你这么说 你想"parse the packet and store it in a buffer in Factory",也许你 只想做self.factory.messagesBuffer.append(message)。希望这个 似乎比您的 "packet concatenation" 方法更干净,但后者不是 描述清楚足以让我理解你认为令人反感的地方 关于它。