如何使用 for 循环在多个 goroutine 之间进行通信,其中一个 goroutine 中有阻塞函数调用
How to communicate between multiple goroutines with for loops with blocking function calls inside one of them
我正在编写一个接受 websocket 连接的 Go 应用程序,然后启动:
listen
goroutine 侦听客户端消息的连接,并根据通过通道接收到的消息为客户端发送响应到 updateClient
。
updateClient
写入连接的 goroutine。
processExternalData
goroutine 从消息队列接收数据,通过通道将数据发送到 updateClient
,以便 updateClient
可以用数据更新客户端。
我正在使用 gorilla library for websocket connections, and its read call is blocking. In addition, both its write and read methods don't support 并发调用,这是我拥有 updateClient
goroutine 的主要原因,它是调用 write 方法的单个例程。
当我需要关闭连接时出现问题,至少在两种情况下会发生:
- 客户端关闭连接或读取时出错。
processExternalData
完成,没有更多数据更新客户端,应该关闭连接。
所以 updateClient
需要以某种方式通知 listen
退出,反之亦然 listen
需要以某种方式通知 updateClient
退出。 updateClient
在 select
中有一个退出通道,但 listen
不能有 select
,因为它已经有一个 for
循环,里面有阻塞读取调用。
所以我所做的是在连接类型上添加 isJobFinished
字段,这是 for
循环工作的条件:
type WsConnection struct {
connection *websocket.Conn
writeChan chan messageWithCb
quitChan chan bool
isJobFinished bool
userID string
}
func processExternalData() {
// receive data from message queue
// send it to client via writeChan
}
func (conn *WsConnection) listen() {
defer func() {
conn.connection.Close()
conn.quitChan <- true
}()
// keep the loop for communication with client
for !conn.isJobFinished {
_, message, err := conn.connection.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
// convert message to type messageWithCb
switch clientMessage.MessageType {
case userNotFound:
conn.writeChan <- messageWithCb{
message: map[string]interface{}{
"type": user,
"payload": false,
},
}
default:
log.Printf("Unknown message type received: %v", clientMessage)
}
}
log.Println("end of listen")
}
func updateClient(w http.ResponseWriter, req *http.Request) {
upgrader.CheckOrigin = func(req *http.Request) bool {
return true
}
connection, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
wsConn := &WsConnection{
connection: connection,
writeChan: make(chan messageWithCb),
quitChan: make(chan bool),
}
go wsConn.listen()
for {
select {
case msg := <-wsConn.writeChan:
err := connection.WriteJSON(msg.message)
if err != nil {
log.Println("connection.WriteJSON error: ", err)
}
if wsConn.isJobFinished {
if msg.callback != nil {
msg.callback() // sends on `wsConn.quitChan` from a goroutine
}
}
case <-wsConn.quitChan:
// clean up
wsConn.connection.Close()
close(wsConn.writeChan)
return
}
}
}
我想知道 Go 中是否存在针对此类情况的更好模式。具体来说,我希望能够在 listen
内也有一个退出通道,这样 updateClient
就可以通知它退出而不是维护 isJobFinished
字段。同样在这种情况下,不保护 isJobFinished
字段没有危险,因为只有一种方法写入它,但如果逻辑变得更复杂,则必须保护 listen
中 for
循环内的字段可能会对性能产生负面影响。
我也无法关闭 quiteChan
,因为 listen
和 updateClient
都在使用它,而且他们无法知道它何时被另一个关闭。
关闭连接以使 listen
goroutine 脱离阻塞读取调用。
在updateClient
中添加defer语句关闭连接,清理其他资源。 Return 来自函数的任何错误或来自退出通道的通知:
updateClient(w http.ResponseWriter, req *http.Request) {
upgrader.CheckOrigin = func(req *http.Request) bool {
return true
}
connection, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer connection.Close() // <--- Add this line
wsConn := &WsConnection{
connection: connection,
writeChan: make(chan messageWithCb),
quitChan: make(chan bool),
}
defer close(writeChan) // <-- cleanup moved out of loop below.
go wsConn.listen()
for {
select {
case msg := <-wsConn.writeChan:
err := connection.WriteJSON(msg.message)
if err != nil {
log.Println("connection.WriteJSON error: ", err)
return
}
case <-wsConn.quitChan:
return
}
}
}
在listen
函数中,循环直到读取连接出错。当 updateClient
关闭连接时立即读取连接 returns 并出现错误。
为了防止listen
在updateClient
returns首先发生的情况下永远阻塞,请关闭退出通道而不是发送值。
func (conn *WsConnection) listen() {
defer func() {
conn.connection.Close()
close(conn.quitChan) // <-- close instead of sending value
}()
// keep the loop for communication with client
for {
_, message, err := conn.connection.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
// convert message to type messageWithCb
switch clientMessage.MessageType {
case userNotFound:
conn.writeChan <- messageWithCb{
message: map[string]interface{}{
"type": user,
"payload": false,
},
}
default:
log.Printf("Unknown message type received: %v", clientMessage)
}
}
log.Println("end of listen")
}
字段 isJobFinished
不需要。
问题中的代码和此答案中的一个问题是 writeChan
的关闭与发送到通道不协调。如果没有看到 processExternalData
函数,我无法评论此问题的解决方案。
使用互斥量而不是 goroutine 来限制写并发可能是有意义的。同样,processExternalData
函数中的代码需要进一步评论此主题。
我正在编写一个接受 websocket 连接的 Go 应用程序,然后启动:
listen
goroutine 侦听客户端消息的连接,并根据通过通道接收到的消息为客户端发送响应到updateClient
。updateClient
写入连接的 goroutine。processExternalData
goroutine 从消息队列接收数据,通过通道将数据发送到updateClient
,以便updateClient
可以用数据更新客户端。
我正在使用 gorilla library for websocket connections, and its read call is blocking. In addition, both its write and read methods don't support 并发调用,这是我拥有 updateClient
goroutine 的主要原因,它是调用 write 方法的单个例程。
当我需要关闭连接时出现问题,至少在两种情况下会发生:
- 客户端关闭连接或读取时出错。
processExternalData
完成,没有更多数据更新客户端,应该关闭连接。
所以 updateClient
需要以某种方式通知 listen
退出,反之亦然 listen
需要以某种方式通知 updateClient
退出。 updateClient
在 select
中有一个退出通道,但 listen
不能有 select
,因为它已经有一个 for
循环,里面有阻塞读取调用。
所以我所做的是在连接类型上添加 isJobFinished
字段,这是 for
循环工作的条件:
type WsConnection struct {
connection *websocket.Conn
writeChan chan messageWithCb
quitChan chan bool
isJobFinished bool
userID string
}
func processExternalData() {
// receive data from message queue
// send it to client via writeChan
}
func (conn *WsConnection) listen() {
defer func() {
conn.connection.Close()
conn.quitChan <- true
}()
// keep the loop for communication with client
for !conn.isJobFinished {
_, message, err := conn.connection.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
// convert message to type messageWithCb
switch clientMessage.MessageType {
case userNotFound:
conn.writeChan <- messageWithCb{
message: map[string]interface{}{
"type": user,
"payload": false,
},
}
default:
log.Printf("Unknown message type received: %v", clientMessage)
}
}
log.Println("end of listen")
}
func updateClient(w http.ResponseWriter, req *http.Request) {
upgrader.CheckOrigin = func(req *http.Request) bool {
return true
}
connection, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
wsConn := &WsConnection{
connection: connection,
writeChan: make(chan messageWithCb),
quitChan: make(chan bool),
}
go wsConn.listen()
for {
select {
case msg := <-wsConn.writeChan:
err := connection.WriteJSON(msg.message)
if err != nil {
log.Println("connection.WriteJSON error: ", err)
}
if wsConn.isJobFinished {
if msg.callback != nil {
msg.callback() // sends on `wsConn.quitChan` from a goroutine
}
}
case <-wsConn.quitChan:
// clean up
wsConn.connection.Close()
close(wsConn.writeChan)
return
}
}
}
我想知道 Go 中是否存在针对此类情况的更好模式。具体来说,我希望能够在 listen
内也有一个退出通道,这样 updateClient
就可以通知它退出而不是维护 isJobFinished
字段。同样在这种情况下,不保护 isJobFinished
字段没有危险,因为只有一种方法写入它,但如果逻辑变得更复杂,则必须保护 listen
中 for
循环内的字段可能会对性能产生负面影响。
我也无法关闭 quiteChan
,因为 listen
和 updateClient
都在使用它,而且他们无法知道它何时被另一个关闭。
关闭连接以使 listen
goroutine 脱离阻塞读取调用。
在updateClient
中添加defer语句关闭连接,清理其他资源。 Return 来自函数的任何错误或来自退出通道的通知:
updateClient(w http.ResponseWriter, req *http.Request) {
upgrader.CheckOrigin = func(req *http.Request) bool {
return true
}
connection, err := upgrader.Upgrade(w, req, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer connection.Close() // <--- Add this line
wsConn := &WsConnection{
connection: connection,
writeChan: make(chan messageWithCb),
quitChan: make(chan bool),
}
defer close(writeChan) // <-- cleanup moved out of loop below.
go wsConn.listen()
for {
select {
case msg := <-wsConn.writeChan:
err := connection.WriteJSON(msg.message)
if err != nil {
log.Println("connection.WriteJSON error: ", err)
return
}
case <-wsConn.quitChan:
return
}
}
}
在listen
函数中,循环直到读取连接出错。当 updateClient
关闭连接时立即读取连接 returns 并出现错误。
为了防止listen
在updateClient
returns首先发生的情况下永远阻塞,请关闭退出通道而不是发送值。
func (conn *WsConnection) listen() {
defer func() {
conn.connection.Close()
close(conn.quitChan) // <-- close instead of sending value
}()
// keep the loop for communication with client
for {
_, message, err := conn.connection.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
// convert message to type messageWithCb
switch clientMessage.MessageType {
case userNotFound:
conn.writeChan <- messageWithCb{
message: map[string]interface{}{
"type": user,
"payload": false,
},
}
default:
log.Printf("Unknown message type received: %v", clientMessage)
}
}
log.Println("end of listen")
}
字段 isJobFinished
不需要。
问题中的代码和此答案中的一个问题是 writeChan
的关闭与发送到通道不协调。如果没有看到 processExternalData
函数,我无法评论此问题的解决方案。
使用互斥量而不是 goroutine 来限制写并发可能是有意义的。同样,processExternalData
函数中的代码需要进一步评论此主题。