GoLang Protobuf:如何使用同一个 tcp 连接发送多条消息?

GoLang Protobuf: How to send multiple messages using the same tcp connection?

我正在使用 GoLang protobuf 对通过单个 tcp 连接发送的消息进行编码(和解码)。

.proto 结构

message Prepare{
   int64 instance = 1;
   int64 round = 2;
   int64 nodeId = 3;
}

然后我使用protoc工具生成相应的存根

这就是我将内容写入线路的方式。

func (t *Prepare) Marshal(wire io.Writer) {

    data, err := proto.Marshal(t)
    if err != nil {
        panic(err)
    }
    _, err = wire.Write(data)
    if err != nil {
        panic(err)
    }
}

这就是我在接收方读取和解组的方式。

func (t *Prepare) Unmarshal(wire io.Reader) error {
    data := make([]byte, 8*1024*1024) 
    length, err := wire.Read(data)
    if err != nil {
        panic(err)
    }
    err = proto.Unmarshal(data[:length], t)
    if err != nil {
        panic(err)
    }
    return nil
}

如果对于每个 protobuf 消息,都会生成一个新的 tcp 连接,则上述方法可以正常工作。但是当单个 tcp 连接用于传输多个消息(持久连接)时,解组失败并显示错误 proto: invalid field number

出现此问题是因为,使用单个连接发送的 protobuf 消息不强制任何消息边界,因此在读取 length, err := wire.Read(data)data 缓冲区可以包含对应于 1) 多个 protobuff 消息的字节, 和 2) 部分 protobuff 消息。

protobuf 文档提到了以下解决方案。

If you want to write multiple messages to a single file or stream, it is up to you to keep track of where one message ends and the next begins. The Protocol Buffer wire format is not self-delimiting, so protocol buffer parsers cannot determine where a message ends on their own. The easiest way to solve this problem is to write the size of each message before you write the message itself. When you read the messages back in, you read the size, then read the bytes into a separate buffer, then parse from that buffer. (If you want to avoid copying bytes to a separate buffer, check out the CodedInputStream class (in both C++ and Java) which can be told to limit reads to a certain number of bytes.)

虽然这是一种直观的方法,但它归结为先有鸡还是先有蛋的问题。写入线路的字节数组的长度(取自 data, err := proto.Marshal(t); len(data) )不固定,并且不知道表示该数字 (len(data)) 需要多少字节。现在我们遇到了同样的问题,如何发送字节数组的长度以在接收方读取,而实际上不知道 length 将占用多少字节(换句话说,接收方如何知道length字段对应多少字节)

对此有什么建议吗?

谢谢

我会推荐使用 gRPC,但你已经说过你不想要那个。 我也可以推荐发送简单的 UTP 包,因为 UDP 根本不需要连接。

如果您想坚持目前的方法,解决方法很简单: 将 protobuf 编组为字节数组后,您就知道它的长度了。它是 len(data) ,这是您要首先写入的值。 wire.Write() 写入的实际字节数将相同。如果不是,就是连接有问题,包只写了partialy。所以接收方无法将其解组。

接收时,首先读取长度,准备一个大小正确的缓冲区,或者更好的是,制作一个 LimitedReader 并从那里解组。

字节数应编码为整数。您可以使用 32 位或 64 位值,您还需要在小端和大端之间做出决定 - 您使用的是什么无关紧要,只要发送方和接收方的大小和字节序相同即可。

看看 https://pkg.go.dev/encoding/binary 和 ByteOrder 上定义的函数:

binary.LittleEndian.PutUint64(w, uint64(len(data)))
length := int64(binary.LittleEndian.Uint64(r))

当然,即使是一个简单的错误,或者你只错了一个字节,剩下的所有数据实际上都是无用的。通过将消息作为专用 UDP 包发送,您可以避免此问题。

针对问题中提到的确切场景详细说明上述答案

func (t *Prepare) Marshal(wire io.Writer) {
    data, err := proto.Marshal(t)
    if err != nil {
        panic(err)
    }
    lengthWritten := len(data)
    var b [16]byte
    bs := b[:16]
    binary.LittleEndian.PutUint64(bs, uint64(lengthWritten))
    _, err = wire.Write(bs)
    if err != nil {
        panic(err)
    }
    _, err = wire.Write(data)
    if err != nil {
        panic(err)
    }
}

func (t *Prepare) Unmarshal(wire io.Reader) error {

    var b [16]byte
    bs := b[:16]

    _, err := io.ReadFull(wire, bs)
    numBytes := uint64(binary.LittleEndian.Uint64(bs))

    data := make([]byte, numBytes)
    length, err := io.ReadFull(wire, data)
    if err != nil {
        panic(err)
    }
    err = proto.Unmarshal(data[:length], t)
    if err != nil {
        panic(err)
    }
    return nil
}