多次使用时 TCP 连接 returns 'broken pipe' 错误

TCP connection returns 'broken pipe' error when used multiple times

这个问题是关于go和它的net包的。

我写了一个简单的 tcp 服务器处理一些 RPC。客户端正在使用 chan net.Conn 来管理客户端的所有 tcp 连接。服务器是 运行 一个 tcp 侦听器。

代码如下: 客户:

package server

import (
    "errors"
    "log"
    "net"
)

var tcpPool chan net.Conn

func NewClient(connections int, address string) {

    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp4", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(msg []byte) ([]byte, error) {
    conn := getConn()

    log.Println("check conn: ", conn)
    log.Println("msg: ", msg)

    defer releaseConn(conn)
    // send message
    n, err := conn.Write(msg)
    if err != nil {
        log.Panic(err)
    } else if n < len(msg) {
        log.Panic(errors.New("Message did not send in full"))
    }

    // receiving a message
    inBytes := make([]byte, 0)

    for {
        // bufsize 1024, read bufsize bytes each time
        b := make([]byte, bufSize)
        res, err := conn.Read(b)
        log.Println("server sends >>>>>>>>>>>>: ", res)
        if err != nil {
            b[0] = ReError
            break
        }
        inBytes = append(inBytes, b[:res]...)
        // message finished.
        if res < bufSize {
            break
        }
    }
    // check replied message
    if len(inBytes) == 0 {
        return []byte{}, errors.New("empty buffer error")
    }
    log.Println("SendMessage gets: ", inBytes)
    return inBytes, nil
}

func releaseConn(conn net.Conn) error {
    log.Println("return conn to pool")
    select {
    case tcpPool <- conn:
        return nil
    }
}

func getConn() (conn net.Conn) {
    log.Println("Take one from pool")
    select {
    case conn := <-tcpPool:
        return conn
    }
}

服务器

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return errors.Wrapf(err, "Unable to listen on address %s\n", addr)
    }
    log.Println("Listen on", listener.Addr().String())
    defer listener.Close()
    for {
        log.Println("Accept a connection request.")
        conn, err := listener.Accept()
        if err != nil {
            log.Println("Failed accepting a connection request:", err)
            continue
        }
        log.Println("Handle incoming messages.")
        go onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    defer func() {
        if e := recover(); e != nil {
            //later log
            if err, ok := e.(error); ok {
                println("recover", err.Error())
            }
        }
        conn.Close()
    }()
    // load msg
    for {
        buf := make([]byte, bufSize)
        res, err := conn.Read(buf)
        log.Println("server reading: ", res)
        inBytes = append(inBytes, buf[:res]...)
        if err != nil || res < bufSize {
            break
        }
    }

    var req RPCRequest
    err := json.Unmarshal(inBytes, &req)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request: ", req)

    var query UserRequest
    err = json.Unmarshal(req.Query, &query)
    if err != nil {
        log.Panic(err)
    }
    log.Println("rpc request query: ", query)

    // call method to process request
    // good now we can proceed to function call
    // some actual function calls gets a output
    // outBytes, err := json.Marshal(out)
    conn.Write(outBytes)
}

我觉得这个很标准。但是由于某些原因,我只能在客户端发送消息1,然后接下来的2nd和3rd开始显示一些不正常。

1st ---> 成功,得到响应 第二 ---> 客户端可以发送但没有返回,服务器端的日志显示没有消息 第三次 ---> 如果我再从客户端发送一次,它会显示 broken pipe 错误..

有一些不好的处理方式。 首先,确保来自服务器的消息完成的标志取决于 io.EOF,而不是长度

    // message finished.
    if res < 512 {
        break
    }

而不是这个,reader returns io.EOF 是唯一显示消息已完成的符号。 其次,chan type有它的属性来阻塞,不需要使用select.by的方式,你真的需要启动一个goroutine来释放。 getConn

的相同要求
func releaseConn(conn net.Conn)  {
    go func(){
        tcpPool <- conn
    }()
}

func getConn() net.Conn {
    con := <-tcpPool
    return con
}

第三,监听器不要关闭,下面的代码是错误的

defer listener.Close()

最重要的原因是 在客户端, res, err := conn.Read(b) 这会收到服务器的回复。 当没有回复时,它会阻止而不是 io.EOF,也不会出现其他错误。 这意味着,您不能将持久通信部分装入函数 send() 中。 您可以做一件事来使用 sendmsg() 发送,但永远不要使用 sendmsg() 来处理回复。 你可以这样处理回复

var receive chan string

func init() {
    receive = make(chan string, 10)
}
func ReceiveMessage(con net.Conn) {
    // receiving a message
    inBytes := make([]byte, 0, 1000)
    var b = make([]byte, 512)
    for {
        // bufsize 1024, read bufsize bytes each time
        res, err := con.Read(b)
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Println(err.Error())
            break
        }
        inBytes = append(inBytes, b[:res]...)
        msg := string(inBytes)
        fmt.Println("receive msg from server:" + msg)
        receive <- msg
    }
}

我在你的代码中发现了几个问题,但我不知道是哪一个导致了你的失败。 这是我根据您编写的代码并进行了一些修复。 client.go:

package main

import (
    "fmt"
    "io"
    "log"
    "net"
)

var tcpPool chan net.Conn
var receive chan string

func init() {
    receive = make(chan string, 10)
}
func NewClient(connections int, address string) {
    tcpPool = make(chan net.Conn, connections)
    for i := 0; i < connections; i++ {
        conn, err := net.Dial("tcp", address)
        if err != nil {
            log.Panic(err)
        }
        tcpPool <- conn
    }
}

func SendMessage(con net.Conn, msg []byte) error {
    // send message
    _, err := con.Write(msg)
    if err != nil {
        log.Panic(err)
    }
    return nil
}

func ReceiveMessage(con net.Conn) {
    // receiving a message
    inBytes := make([]byte, 0, 1000)
    var b = make([]byte, 512)
    for {
        // bufsize 1024, read bufsize bytes each time
        res, err := con.Read(b)
        if err != nil {
            if err == io.EOF {
                break
            }
            fmt.Println(err.Error())
            break
        }
        inBytes = append(inBytes, b[:res]...)
        msg := string(inBytes)
        fmt.Println("receive msg from server:" + msg)
        receive <- msg
    }
}

func getConn() net.Conn {
    con := <-tcpPool
    return con
}

func main() {
    NewClient(20, "localhost:8101")
    con := <-tcpPool
    e := SendMessage(con, []byte("hello, i am client"))
    if e != nil {
        fmt.Println(e.Error())
        return
    }
    go ReceiveMessage(con)
    var msg string
    for {
        select {
        case msg = <-receive:
            fmt.Println(msg)
        }
    }
}

server.go

package main

import (
    "fmt"
    "io"
    "net"
)

func StartTCPServer(network, addr string) error {
    listener, err := net.Listen(network, addr)
    if err != nil {
        return err
    }
    for {
        conn, err := listener.Accept()
        if err != nil {

            fmt.Println(err.Error())
            continue

        }
        onConn(conn)
    }
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
    inBytes := make([]byte, 0)
    // load msg
    for {
        buf := make([]byte, 512)
        res, err := conn.Read(buf)
        if err != nil {
            if err == io.EOF {
                return
            }
            fmt.Println(err.Error())
            return
        }
        inBytes = append(inBytes, buf[:res]...)

        fmt.Println("receive from client:" + string(inBytes))
        conn.Write([]byte("hello"))
    }
}

func main() {
    if e := StartTCPServer("tcp", ":8101"); e != nil {
        fmt.Println(e.Error())
        return
    }
}

这有效,没有错误。 顺便说一句,我看不到你在客户端或服务器端的什么地方做 con.Close()。关闭是必要的 it.This 意味着连接一旦从池中获取,就不会再放回去。当你认为一个连接结束了,然后关闭它并建立一个新的连接来填充池而不是把它放回去,因为把一个关闭的连接放回池中是一个致命的操作。