将 websocket 循环与 Golang 中的通道同步

Syncing websocket loops with channels in Golang

我在尝试为给定用户保持某些 websocket 同步时面临两难境地。这是基本设置:

type msg struct {
    Key         string
    Value   string
}

type connStruct struct {
    //...

    ConnRoutineChans []*chan string
    LoggedIn        bool
    Login       string

    //...

    Sockets         []*websocket.Conn
}

var (
    //...

    /*  LIST OF CONNECTED USERS AN THEIR IP ADDRESSES  */

        guestMap sync.Map

)

func main() {
    post("Started...")
    rand.Seed(time.Now().UTC().UnixNano())
    http.HandleFunc("/wss", wsHandler)
    panic(http.ListenAndServeTLS("...", "...", "...", nil))
}

func wsHandler(w http.ResponseWriter, r *http.Request) {
    if r.Header.Get("Origin")+":8080" != "https://...:8080" {
        http.Error(w, "Origin not allowed", 403)
        fmt.Println("Client origin not allowed! (https://"+r.Host+")")
        fmt.Println("r.Header Origin: "+r.Header.Get("Origin"))
        return
    }
    ///
    conn, err := websocket.Upgrade(w, r, w.Header(), 1024, 1024)
    if err != nil {
        http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
        fmt.Println("Could not open websocket connection with client!")
    }

    //ADD CONNECTION TO guestMap IF CONNECTION IS nil
    var authString string = /*gets device identity*/;
    var authChan chan string = make(chan string);
    authValue, authOK := guestMap.Load(authString);
    if !authOK {
        // NO SESSION, CREATE A NEW ONE
        newSession = getSession();
        //defer newSession.Close();
        guestMap.Store(authString, connStruct{ LoggedIn: false,
                                ConnRoutineChans: []*chan string{&authChan},
                                         Login: "",
                                        Sockets: []*websocket.Conn{conn}
                                        /* .... */ });
    }else{
        //SESSION STARTED, ADD NEW SOCKET TO Sockets
        var tempConn connStruct = authValue.(connStruct);
        tempConn.Sockets = append(tempConn.Sockets, conn);
        tempConn.ConnRoutineChans = append(tempConn.ConnRoutineChans, &authChan)
        guestMap.Store(authString, tempConn);
    }

    //
    go echo(conn, authString, &authChan);
}

func echo(conn *websocket.Conn, authString string, authChan *chan string) {

    var message msg;

    //TEST CHANNEL
    authValue, _ := guestMap.Load(authString);
    go sendToChans(authValue.(connStruct).ConnRoutineChans, "sup dude?")

    fmt.Println("got past send...");

    for true {
        select {
            case val := <-*authChan:
                // use value of channel
                fmt.Println("AuthChan for user #"+strconv.Itoa(myConnNumb)+" spat out: ", val)
            default:
                // if channels are empty, this is executed
        }

        readError := conn.ReadJSON(&message)
        fmt.Println("got past readJson...");

        if readError != nil || message.Key == "" {
            //DISCONNECT USER
            //.....
            return
        }

        //
        _key, _value := chief(message.Key, message.Value, &*conn, browserAndOS, authString)

        if writeError := conn.WriteJSON(_key + "|" + _value); writeError != nil {
            //...
            return
        }

        fmt.Println("got past writeJson...");
    }
}

func sendToChans(chans []*chan string, message string){
    for i := 0; i < len(chans); i++ {
        *chans[i] <- message
    }
}

我知道,一大段代码是吗?我注释掉了大部分...

无论如何,如果您曾经使用过 websocket,那么其中的大部分应该都非常熟悉:

1) func wsHandler() 每次用户连接时触发。它在guestMap(对于连接的每个唯一设备)中创建一个条目,其中包含一个connStruct,其中包含一个频道列表:ConnRoutineChans []*chan string。这一切都传递给:

2) echo(),这是一个为每个 websocket 连接不断运行的 goroutine。在这里,我只是测试向其他 运行 goroutines 发送消息,但似乎我的 for 循环实际上并没有持续触发。它仅在 websocket 从其连接的开放 tab/window 接收到消息时触发。 (如果有人能阐明这个机制,我很想知道为什么它不不断循环?)

3) 对于用户在给定设备上打开的每个 window 或选项卡,都有一个存储在数组中的 websocket 和通道。我希望能够向数组中的所有通道发送消息(本质上是该设备上打开 tabs/windows 的其他 goroutines)并在其他 goroutines 中接收消息以更改不断设置的一些变量 运行协程

我现在所拥有的仅适用于设备上的第一个连接,并且(当然)它会向自身发送 "sup dude?",因为它是当时阵列中的唯一通道。然后,如果您打开一个新标签(或什至很多),则消息根本不会发送给任何人!奇怪?...然后当我关闭所有选项卡(并且我注释掉的逻辑从 guestMap 中删除设备项)并启动一个新的设备会话时,仍然只有第一个连接得到它自己的消息。

我已经有一种方法可以将消息发送到设备上的所有其他 websocket,但是发送到 goroutine 似乎比我想象的要棘手一些。

回答我自己的问题:

首先,我已经从 sync.map 切换到法线贴图。其次,为了不让任何人同时对它 reading/writing,我创建了一个频道,您可以调用该频道在地图上执行任何 read/write 操作。我一直在尽最大努力使我的数据访问和操作能够快速执行,这样频道就不会那么容易拥挤。这是一个小例子:

package main

import (
    "fmt"
)

var (
  guestMap map[string]*guestStruct = make(map[string]*guestStruct);
  guestMapActionChan = make (chan actionStruct);

)

type actionStruct struct {
    Action      func([]interface{})[]interface{}
    Params      []interface{}
    ReturnChan  chan []interface{}
}

type guestStruct struct {
    Name string
    Numb int
}

func main(){
    //make chan listener
    go guestMapActionChanListener(guestMapActionChan)

    //some guest logs in...
    newGuest := guestStruct{Name: "Larry Josher", Numb: 1337}

    //add to the map
    addRetChan := make(chan []interface{})
    guestMapActionChan <- actionStruct{Action: guestMapAdd,
                                       Params: []interface{}{&newGuest},
                                       ReturnChan: addRetChan}
    addReturned := <-addRetChan

    fmt.Println(addReturned)
    fmt.Println("Also, numb was changed by listener to:", newGuest.Numb)

    // Same kind of thing for removing, except (of course) there's
    // a lot more logic to a real-life application.
}

func guestMapActionChanListener (c chan actionStruct){
    for{
        value := <-c;
        //
        returned := value.Action(value.Params);
        value.ReturnChan <- returned;
        close(value.ReturnChan)
    }
}

func guestMapAdd(params []interface{}) []interface{} {
    //.. do some parameter verification checks
    theStruct := params[0].(*guestStruct)
    name := theStruct.Name
    theStruct.Numb = 75
    guestMap[name] = &*theStruct

    return []interface{}{"Added '"+name+"' to the guestMap"}
}

对于连接之间的通信,我只是让每个套接字循环保持它们的 guestStruct,并有更多 guestMapActionChan 函数负责将数据分发给其他客人的 guestStructs

现在,我不会将此标记为正确答案,除非我得到一些关于如何以正确的方式做这样的事情的更好建议。但现在这是可行的,应该 保证 reading/writing 没有比赛到地图。

编辑:正确的方法应该是像我在(大部分)完成的项目 GopherGameServer

中那样使用 sync.Mutex