bufio.NewReader ReadBytes - 同时读取多条消息

bufio.NewReader ReadBytes - Reading multiple message at the same time

我有这个侦听器功能,可以在网络上侦听来自其对等方的消息。

它有时工作正常,但当它同时收到两条消息时,我收到以下错误:

“消息解码错误 - 缓冲区中有额外数据”

是否可以修改为允许同时发送多条消息?

func Listen(peer Peer) {

    log.Info(" Listening for messages from: ", peer.Address)

    for {
        //will listen for message to process ending in newline (\n)
        msg, msgErr := bufio.NewReader(peer.conn).ReadBytes([]byte(`\n`)[0])
        if msgErr == io.EOF {
            peer.conn.Close()
        } else if msgErr == nil {
            msg, err := hex.DecodeString(string(msg[:len(msg)-1]))
            mgsDecoded, decodeErr := DeserializeMessage(msg[:])

            if decodeErr == nil { 
                // use decoded message here
            } else {
                log.Warn(" Message Decode error - ", decodeErr)
            }

        }
    }
}

//DeserializeMessage - Decode our message from a byte array to
//networkMessage *NetworkMessage
//over the network
func DeserializeMessage(serialized_bytes []byte) (NetworkMessage, error) {
    // Create new buffer and decoder
    buf := bytes.NewBuffer(serialized_bytes)
    enc := gob.NewDecoder(buf)
    // Create new Block Instance to load the serialized block into
    var networkMessage NetworkMessage
    err := enc.Decode(&networkMessage)
    return networkMessage, err
}

添加了发送消息的方式:


func SendMsgToPeer(networkMessage NetworkMessage, peer Peer) {

    log.Debug(" Message Sent -> Peer: ", peer.Address,
        " Command: ", string(networkMessage.Command),
        " Payload: ", string(utils.TruncateString(networkMessage.Payload, 50)))

    msg, err := networkMessage.SerializeMessage()
    if err != nil {
        log.Warn("❌ Error encoding message.")
    }

    peer.conn.Write([]byte(hex.EncodeToString(msg)))
    peer.conn.Write([]byte(`\n`))
}

应用程序在循环的每次迭代中创建和丢弃缓冲区。丢弃的缓冲区可以包含从连接读取的未处理数据。

已通过在循环外创建 bufio.Reader 修复。

表达式 []byte(`\n`)[0] 的计算结果为字节 \,而不是 \n。通过将 \n 指定为分隔符来修复。

br := bufio.NewReader(peer.conn)
for {
    //will listen for message to process ending in newline (\n)
    msg, msgErr := br.ReadBytes('\n')
    ...

客户端将消息终止符写入\n两个字节。通过使用解释的 string literal 而不是原始字符串文字,将客户端更改为写入单字节 \n

peer.conn.Write([]byte("\n"))

如果您可以取消使用换行符分隔的十六进制编码消息的要求,那么您可以直接使用 gob 解码器:

func Listen(peer Peer) {
    defer peer.conn.Close()
    log.Info(" Listening for messages from: ", peer.Address)
    dec := gob.NewDecoder(peer.conn)
    for {
       var networkMessage NetworkMessage
       err := dec.Decode(&networkMessage)
       if err != nil {
          log.Info("decode error:", err)
          return
        }
        // do something with networkMessage
    }
}

对客户端代码进行相应的修改。将字段 enc *gob.Encoder 添加到 Peer 并将字段初始化为 gob.NewEncoder(peer.conn)。使用 SendMsgToPeer 中的编码器。

func SendMsgToPeer(networkMessage NetworkMessage, peer Peer) {
    log.Debug(" Message Sent -> Peer: ", peer.Address,
        " Command: ", string(networkMessage.Command),
        " Payload: ", string(utils.TruncateString(networkMessage.Payload, 50)))
    err := peer.enc.Encode(networkMessage)
    if err != nil {
        log.Warn("❌ Error encoding message.")
    }
}