bufio.NewReader ReadBytes - 同时读取多条消息
bufio.NewReader ReadBytes - Reading multiple message at the same time
我有这个侦听器功能,可以在网络上侦听来自其对等方的消息。
它有时工作正常,但当它同时收到两条消息时,我收到以下错误:
“消息解码错误 - 缓冲区中有额外数据”
是否可以修改为允许同时发送多条消息?
func Listen(peer Peer) {
log.Info(" Listening for messages from: ", peer.Address)
for {
//will listen for message to process ending in newline (\n)
msg, msgErr := bufio.NewReader(peer.conn).ReadBytes([]byte(`\n`)[0])
if msgErr == io.EOF {
peer.conn.Close()
} else if msgErr == nil {
msg, err := hex.DecodeString(string(msg[:len(msg)-1]))
mgsDecoded, decodeErr := DeserializeMessage(msg[:])
if decodeErr == nil {
// use decoded message here
} else {
log.Warn(" Message Decode error - ", decodeErr)
}
}
}
}
//DeserializeMessage - Decode our message from a byte array to
//networkMessage *NetworkMessage
//over the network
func DeserializeMessage(serialized_bytes []byte) (NetworkMessage, error) {
// Create new buffer and decoder
buf := bytes.NewBuffer(serialized_bytes)
enc := gob.NewDecoder(buf)
// Create new Block Instance to load the serialized block into
var networkMessage NetworkMessage
err := enc.Decode(&networkMessage)
return networkMessage, err
}
添加了发送消息的方式:
func SendMsgToPeer(networkMessage NetworkMessage, peer Peer) {
log.Debug(" Message Sent -> Peer: ", peer.Address,
" Command: ", string(networkMessage.Command),
" Payload: ", string(utils.TruncateString(networkMessage.Payload, 50)))
msg, err := networkMessage.SerializeMessage()
if err != nil {
log.Warn("❌ Error encoding message.")
}
peer.conn.Write([]byte(hex.EncodeToString(msg)))
peer.conn.Write([]byte(`\n`))
}
应用程序在循环的每次迭代中创建和丢弃缓冲区。丢弃的缓冲区可以包含从连接读取的未处理数据。
已通过在循环外创建 bufio.Reader 修复。
表达式 []byte(`\n`)[0]
的计算结果为字节 \
,而不是 \n
。通过将 \n
指定为分隔符来修复。
br := bufio.NewReader(peer.conn)
for {
//will listen for message to process ending in newline (\n)
msg, msgErr := br.ReadBytes('\n')
...
客户端将消息终止符写入\
、n
两个字节。通过使用解释的 string literal 而不是原始字符串文字,将客户端更改为写入单字节 \n
。
peer.conn.Write([]byte("\n"))
如果您可以取消使用换行符分隔的十六进制编码消息的要求,那么您可以直接使用 gob 解码器:
func Listen(peer Peer) {
defer peer.conn.Close()
log.Info(" Listening for messages from: ", peer.Address)
dec := gob.NewDecoder(peer.conn)
for {
var networkMessage NetworkMessage
err := dec.Decode(&networkMessage)
if err != nil {
log.Info("decode error:", err)
return
}
// do something with networkMessage
}
}
对客户端代码进行相应的修改。将字段 enc *gob.Encoder
添加到 Peer
并将字段初始化为 gob.NewEncoder(peer.conn)
。使用 SendMsgToPeer 中的编码器。
func SendMsgToPeer(networkMessage NetworkMessage, peer Peer) {
log.Debug(" Message Sent -> Peer: ", peer.Address,
" Command: ", string(networkMessage.Command),
" Payload: ", string(utils.TruncateString(networkMessage.Payload, 50)))
err := peer.enc.Encode(networkMessage)
if err != nil {
log.Warn("❌ Error encoding message.")
}
}
我有这个侦听器功能,可以在网络上侦听来自其对等方的消息。
它有时工作正常,但当它同时收到两条消息时,我收到以下错误:
“消息解码错误 - 缓冲区中有额外数据”
是否可以修改为允许同时发送多条消息?
func Listen(peer Peer) {
log.Info(" Listening for messages from: ", peer.Address)
for {
//will listen for message to process ending in newline (\n)
msg, msgErr := bufio.NewReader(peer.conn).ReadBytes([]byte(`\n`)[0])
if msgErr == io.EOF {
peer.conn.Close()
} else if msgErr == nil {
msg, err := hex.DecodeString(string(msg[:len(msg)-1]))
mgsDecoded, decodeErr := DeserializeMessage(msg[:])
if decodeErr == nil {
// use decoded message here
} else {
log.Warn(" Message Decode error - ", decodeErr)
}
}
}
}
//DeserializeMessage - Decode our message from a byte array to
//networkMessage *NetworkMessage
//over the network
func DeserializeMessage(serialized_bytes []byte) (NetworkMessage, error) {
// Create new buffer and decoder
buf := bytes.NewBuffer(serialized_bytes)
enc := gob.NewDecoder(buf)
// Create new Block Instance to load the serialized block into
var networkMessage NetworkMessage
err := enc.Decode(&networkMessage)
return networkMessage, err
}
添加了发送消息的方式:
func SendMsgToPeer(networkMessage NetworkMessage, peer Peer) {
log.Debug(" Message Sent -> Peer: ", peer.Address,
" Command: ", string(networkMessage.Command),
" Payload: ", string(utils.TruncateString(networkMessage.Payload, 50)))
msg, err := networkMessage.SerializeMessage()
if err != nil {
log.Warn("❌ Error encoding message.")
}
peer.conn.Write([]byte(hex.EncodeToString(msg)))
peer.conn.Write([]byte(`\n`))
}
应用程序在循环的每次迭代中创建和丢弃缓冲区。丢弃的缓冲区可以包含从连接读取的未处理数据。
已通过在循环外创建 bufio.Reader 修复。
表达式 []byte(`\n`)[0]
的计算结果为字节 \
,而不是 \n
。通过将 \n
指定为分隔符来修复。
br := bufio.NewReader(peer.conn)
for {
//will listen for message to process ending in newline (\n)
msg, msgErr := br.ReadBytes('\n')
...
客户端将消息终止符写入\
、n
两个字节。通过使用解释的 string literal 而不是原始字符串文字,将客户端更改为写入单字节 \n
。
peer.conn.Write([]byte("\n"))
如果您可以取消使用换行符分隔的十六进制编码消息的要求,那么您可以直接使用 gob 解码器:
func Listen(peer Peer) {
defer peer.conn.Close()
log.Info(" Listening for messages from: ", peer.Address)
dec := gob.NewDecoder(peer.conn)
for {
var networkMessage NetworkMessage
err := dec.Decode(&networkMessage)
if err != nil {
log.Info("decode error:", err)
return
}
// do something with networkMessage
}
}
对客户端代码进行相应的修改。将字段 enc *gob.Encoder
添加到 Peer
并将字段初始化为 gob.NewEncoder(peer.conn)
。使用 SendMsgToPeer 中的编码器。
func SendMsgToPeer(networkMessage NetworkMessage, peer Peer) {
log.Debug(" Message Sent -> Peer: ", peer.Address,
" Command: ", string(networkMessage.Command),
" Payload: ", string(utils.TruncateString(networkMessage.Payload, 50)))
err := peer.enc.Encode(networkMessage)
if err != nil {
log.Warn("❌ Error encoding message.")
}
}