net/http:协程间的并发和消息传递

net/http: concurrency and message passing between coroutines

我在 REST API 服务器上工作,该服务器的功能之一是能够在创建新资源或修改现有资源时通过 websocket 通知任意数量的客户端。

我有一个自定义动作路由器,用于将 URL 绑定到函数和 gorillas 的 websocket 库实现。对于 IPC,我决定依赖通道,因为它似乎是 协程之间通信的惯用方式。它的行为也像一个管道,这是我熟悉的概念。

函数 Create 的原型如下所示:

func Create (res http.ResponseWriter, req *http.Request, userdata interface {}) (int, string, interface {})

作为 userdata 结构实例 PipeSet 被传递。它是一个在所有协程之间共享的映射,其中键是 Pipe 的地址( 指向 的指针)并且值相同。这里的基本原理是在删除时加快查找过程。

type Pipe chan string                                                           

type PipeSet struct {                                                           
    sync.Mutex                                                                  
    Pipes map [*Pipe] *Pipe                                                     
}                                                                               

func NewPipe () Pipe {                                                          
    return make (Pipe)                                                          
}                                                                               

func NewPipeSet () PipeSet {                                                    
    var newSet PipeSet                                                      
    newSet.Pipes = make (map[*Pipe] *Pipe)                                  
    return newSet                                                           
}                                                                               

func (o *PipeSet) AddPipe (pipe *Pipe) {                                        
    o.Lock ()                                                                   
    o.Pipes[pipe] = pipe                                                        
    o.Unlock ()                                                                 
}                                                                               

func (o *PipeSet) ForeachPipe (f func (pipe Pipe)) {                            
    o.Lock ()                                                                   
    for k := range (o.Pipes) {                                                  
        f (*o.Pipes[k])                                                         
    }                                                                           
    o.Unlock ()                                                                 
}                                                                               

func (o *PipeSet) DeletePipe (pipe *Pipe) {                                     
    o.Lock ()                                                                   
    delete (o.Pipes, pipe)                                                      
    o.Unlock ()                                                                 
}

当客户端通过 websocket 连接时,会创建一个新通道 (Pipe) 并将其添加到共享 PipeSet。然后,如果创建了一个新资源,协程将通过整个 PipeSet 向每个 Pipe 发送一条消息。然后将消息转发到另一端连接的客户端。

问题区域

我无法检测客户端的 websocket 连接是否仍然存在。我需要知道确定是否应该从 PipeSet 中删除 Pipe。在这种情况下,我依赖 CloseNotifier。它永远不会开火。

代码如下所示(摘录):

var upgrader = websocket.Upgrader {
    CheckOrigin: func (r *http.Request) bool { return true },
}

conn, err := upgrader.Upgrade (res, req, nil)

if err != nil {
    marker.MarkError (err)
    return http.StatusBadRequest, "", nil
}

defer conn.Close ()

exitStatus = http.StatusOK
pipe := genstore.NewPipe ()
quit := res.(http.CloseNotifier).CloseNotify ()

genStore.WSChannels.AddPipe (&pipe)

for {
    log.Printf ("waiting for a message")

    select {
        case wsMsg = <-pipe:
            log.Printf ("got a message: %s (num pipes %d)", wsMsg, len (genStore.WSChannels.Pipes))

            if err = conn.WriteMessage (websocket.TextMessage, []byte (wsMsg)); err != nil {
                marker.MarkError (err)
                goto egress
            }

        case <-quit:
            log.Printf ("quit...")
            goto egress
    }
}

egress:
genStore.WSChannels.DeletePipe (&pipe)

当您使用 Gorilla 将 HTTP 连接升级为 WebSocket 连接时,它会劫持该连接并且 net/http 服务器停止为其提供服务。这意味着,您不能依赖那一刻起的 net/http 事件。

检查这个:https://github.com/gorilla/websocket/issues/123

因此,您在这里可以做的是为每个新的 WebSocket 连接启动新的 goroutine,它将从此连接读取数据并在失败时将消息写入 quit 通道。