如何使用 for 循环在多个 goroutine 之间进行通信,其中一个 goroutine 中有阻塞函数调用

How to communicate between multiple goroutines with for loops with blocking function calls inside one of them

我正在编写一个接受 websocket 连接的 Go 应用程序,然后启动:

  1. listen goroutine 侦听客户端消息的连接,并根据通过通道接收到的消息为客户端发送响应到 updateClient
  2. updateClient 写入连接的 goroutine。
  3. processExternalData goroutine 从消息队列接收数据,通过通道将数据发送到 updateClient,以便 updateClient 可以用数据更新客户端。

我正在使用 gorilla library for websocket connections, and its read call is blocking. In addition, both its write and read methods don't support 并发调用,这是我拥有 updateClient goroutine 的主要原因,它是调用 write 方法的单个例程。

当我需要关闭连接时出现问题,至少在两种情况下会发生:

  1. 客户端关闭连接或读取时出错。
  2. processExternalData 完成,没有更多数据更新客户端,应该关闭连接。

所以 updateClient 需要以某种方式通知 listen 退出,反之亦然 listen 需要以某种方式通知 updateClient 退出。 updateClientselect 中有一个退出通道,但 listen 不能有 select,因为它已经有一个 for 循环,里面有阻塞读取调用。

所以我所做的是在连接类型上添加 isJobFinished 字段,这是 for 循环工作的条件:

type WsConnection struct {
    connection    *websocket.Conn
    writeChan     chan messageWithCb
    quitChan      chan bool
    isJobFinished bool
    userID        string
}

func processExternalData() {
    // receive data from message queue
    // send it to client via writeChan
}

func (conn *WsConnection) listen() {
    defer func() {
        conn.connection.Close()
        conn.quitChan <- true
    }()

    // keep the loop for communication with client
    for !conn.isJobFinished {
        _, message, err := conn.connection.ReadMessage()
        if err != nil {
            log.Println("read:", err)
            break

        }
        // convert message to type messageWithCb
        switch clientMessage.MessageType {
        case userNotFound:
            conn.writeChan <- messageWithCb{
                message: map[string]interface{}{
                    "type":    user,
                    "payload": false,
                },
            }
        default:
            log.Printf("Unknown message type received: %v", clientMessage)
        }
    }
    log.Println("end of listen")
}

func updateClient(w http.ResponseWriter, req *http.Request) {
    upgrader.CheckOrigin = func(req *http.Request) bool {
        return true
    }
    connection, err := upgrader.Upgrade(w, req, nil)
    if err != nil {
        log.Print("upgrade:", err)
        return
    }
    wsConn := &WsConnection{
        connection: connection,
        writeChan:  make(chan messageWithCb),
        quitChan:   make(chan bool),
    }
    go wsConn.listen()
    for {
        select {
        case msg := <-wsConn.writeChan:
            err := connection.WriteJSON(msg.message)
            if err != nil {
                log.Println("connection.WriteJSON error: ", err)
            }
            if wsConn.isJobFinished {
                if msg.callback != nil {
                    msg.callback() // sends on `wsConn.quitChan` from a goroutine
                }
            }
        case <-wsConn.quitChan:
            // clean up
            wsConn.connection.Close()
            close(wsConn.writeChan)
            return
        }
    }
}

我想知道 Go 中是否存在针对此类情况的更好模式。具体来说,我希望能够在 listen 内也有一个退出通道,这样 updateClient 就可以通知它退出而不是维护 isJobFinished 字段。同样在这种情况下,不保护 isJobFinished 字段没有危险,因为只有一种方法写入它,但如果逻辑变得更复杂,则必须保护 listenfor 循环内的字段可能会对性能产生负面影响。

我也无法关闭 quiteChan,因为 listenupdateClient 都在使用它,而且他们无法知道它何时被另一个关闭。

关闭连接以使 listen goroutine 脱离阻塞读取调用。

updateClient中添加defer语句关闭连接,清理其他资源。 Return 来自函数的任何错误或来自退出通道的通知:

updateClient(w http.ResponseWriter, req *http.Request) {
    upgrader.CheckOrigin = func(req *http.Request) bool {
        return true
    }
    connection, err := upgrader.Upgrade(w, req, nil)
    if err != nil {
        log.Print("upgrade:", err)
        return
    }
    defer connection.Close() // <--- Add this line
    wsConn := &WsConnection{
        connection: connection,
        writeChan:  make(chan messageWithCb),
        quitChan:   make(chan bool),
    }
    defer close(writeChan) // <-- cleanup moved out of loop below.
    go wsConn.listen()
    for {
        select {
        case msg := <-wsConn.writeChan:
            err := connection.WriteJSON(msg.message)
            if err != nil {
                log.Println("connection.WriteJSON error: ", err)
                return
            }
        case <-wsConn.quitChan:
            return
        }
    }
}

listen函数中,循环直到读取连接出错。当 updateClient 关闭连接时立即读取连接 returns 并出现错误。

为了防止listenupdateClient returns首先发生的情况下永远阻塞,请关闭退出通道而不是发送值。

func (conn *WsConnection) listen() {
    defer func() {
        conn.connection.Close()
        close(conn.quitChan) // <-- close instead of sending value
    }()

    // keep the loop for communication with client
    for  {
        _, message, err := conn.connection.ReadMessage()
        if err != nil {
            log.Println("read:", err)
            break

        }
        // convert message to type messageWithCb
        switch clientMessage.MessageType {
        case userNotFound:
            conn.writeChan <- messageWithCb{
                message: map[string]interface{}{
                    "type":    user,
                    "payload": false,
                },
            }
        default:
            log.Printf("Unknown message type received: %v", clientMessage)
        }
    }
    log.Println("end of listen")
}

字段 isJobFinished 不需要。

问题中的代码和此答案中的一个问题是 writeChan 的关闭与发送到通道不协调。如果没有看到 processExternalData 函数,我无法评论此问题的解决方案。

使用互斥量而不是 goroutine 来限制写并发可能是有意义的。同样,processExternalData 函数中的代码需要进一步评论此主题。