接收可以部分写入的 protobuf 编码消息?

Receive protobuf encoded messages that can be partially written?

我正在尝试通过TCP在GoLang中发送和接收protobuff编码的消息,发送方可以在操作中途取消write(),接收方可以正确接收部分消息。

请注意,我使用单个 TCP 连接无限发送不同用户定义类型的消息(这不是每个连接消息的情况)

为了具体解释我的问题,首先我将介绍我如何在不部分写入的情况下实现send/receive。

在我的程序中,有多种消息类型,定义在 .proto 文件中。我将解释一种这样的消息类型的机制。

message MessageType {
  int64 sender = 1;
  int64 receiver = 2;
  int64 operation = 3;
  string message = 4;
}

然后我使用 Golang Protobuf 插件生成存根。

然后在发件人端,我是这样发送的

func send(w *bufio.Writer, code uint8, oriMsg MessageType) {
    err := w.WriteByte(code)
    data, err := proto.Marshal(oriMsg)
    lengthWritten := len(data)
    var b [8]byte
    bs := b[:8]
    binary.LittleEndian.PutUint64(bs, uint64(lengthWritten))
    _, err = w.Write(bs)
    _, err = w.Write(data)
    w.flush()
}

然后在receiver这边,下面是我的接收方式

reader *bufio.Reader
for true {
        if msgType, err = reader.ReadByte(); err != nil {
            panic()
        }
        if msgType == 1 || msgType == 2{
            var b [8]byte
            bs := b[:8]

            _, err := io.ReadFull(reader, bs) 
            numBytes := binary.LittleEndian.Uint64(bs)
            data := make([]byte, numBytes)
            length, err := io.ReadFull(reader, data) 
            msg *MessageType = new(GenericConsensus) // an empty message 
            err = proto.Unmarshal(data[:length], msg) 
            // do something with the message 
            
        } else {
            // unknown message type handler
        }
    }

现在我的问题是,如果发件人在中间中止他的写入怎么办:更具体地说,

  1. 情况一:如果发送方写入消息类型字节,然后中止怎么办?在这种情况下,接收方将读取消息类型字节,并等待接收 8 字节的消息长度,但发送方不发送它。

  2. 情况2:这是情况1的扩展版本,发送方首先只发送消息类型字节,中止发送消息长度和编组消息,然后发送下一条消息:类型字节、长度和编码消息。现在在接收方,一切都出错了,因为消息的顺序(类型、长度和编码消息)被违反了。

所以我的问题是,如何修改接收方,使其在发送方违反预先约定的 type:length:encoded-消息顺序的情况下仍能继续运行?

谢谢

为什么发件人会中止一条消息,然后再发送另一条消息?你的意思是它是一个完全拜占庭式的发件人?或者你在准备 fuzzy-testing?

如果您的 API 合同规定发送方始终需要发送正确的消息,那么接收方可以简单地忽略错误消息,甚至在发现违反 [=26= 时关闭连接]合同。

如果您真的需要它,这里有一些关于如何让它发挥作用的想法:

  • 从一个独特的序言开始 - 但随后你必须确保这个序言永远不会出现在数据中
  • 在将消息发送到解码器之前向消息添加校验和。所以完整的数据包将是:[msg_type : msg_len : msg : chksum ]。这允许接收方检查它是正确的消息还是格式错误的消息。

另外,目前的代码,发送一个最大64位的size很容易崩溃。所以你还应该检查大小是否在一个有用的范围内。我会将其限制为 32 位...