将 websocket 循环与 Golang 中的通道同步
Syncing websocket loops with channels in Golang
我在尝试为给定用户保持某些 websocket 同步时面临两难境地。这是基本设置:
type msg struct {
Key string
Value string
}
type connStruct struct {
//...
ConnRoutineChans []*chan string
LoggedIn bool
Login string
//...
Sockets []*websocket.Conn
}
var (
//...
/* LIST OF CONNECTED USERS AN THEIR IP ADDRESSES */
guestMap sync.Map
)
func main() {
post("Started...")
rand.Seed(time.Now().UTC().UnixNano())
http.HandleFunc("/wss", wsHandler)
panic(http.ListenAndServeTLS("...", "...", "...", nil))
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Origin")+":8080" != "https://...:8080" {
http.Error(w, "Origin not allowed", 403)
fmt.Println("Client origin not allowed! (https://"+r.Host+")")
fmt.Println("r.Header Origin: "+r.Header.Get("Origin"))
return
}
///
conn, err := websocket.Upgrade(w, r, w.Header(), 1024, 1024)
if err != nil {
http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
fmt.Println("Could not open websocket connection with client!")
}
//ADD CONNECTION TO guestMap IF CONNECTION IS nil
var authString string = /*gets device identity*/;
var authChan chan string = make(chan string);
authValue, authOK := guestMap.Load(authString);
if !authOK {
// NO SESSION, CREATE A NEW ONE
newSession = getSession();
//defer newSession.Close();
guestMap.Store(authString, connStruct{ LoggedIn: false,
ConnRoutineChans: []*chan string{&authChan},
Login: "",
Sockets: []*websocket.Conn{conn}
/* .... */ });
}else{
//SESSION STARTED, ADD NEW SOCKET TO Sockets
var tempConn connStruct = authValue.(connStruct);
tempConn.Sockets = append(tempConn.Sockets, conn);
tempConn.ConnRoutineChans = append(tempConn.ConnRoutineChans, &authChan)
guestMap.Store(authString, tempConn);
}
//
go echo(conn, authString, &authChan);
}
func echo(conn *websocket.Conn, authString string, authChan *chan string) {
var message msg;
//TEST CHANNEL
authValue, _ := guestMap.Load(authString);
go sendToChans(authValue.(connStruct).ConnRoutineChans, "sup dude?")
fmt.Println("got past send...");
for true {
select {
case val := <-*authChan:
// use value of channel
fmt.Println("AuthChan for user #"+strconv.Itoa(myConnNumb)+" spat out: ", val)
default:
// if channels are empty, this is executed
}
readError := conn.ReadJSON(&message)
fmt.Println("got past readJson...");
if readError != nil || message.Key == "" {
//DISCONNECT USER
//.....
return
}
//
_key, _value := chief(message.Key, message.Value, &*conn, browserAndOS, authString)
if writeError := conn.WriteJSON(_key + "|" + _value); writeError != nil {
//...
return
}
fmt.Println("got past writeJson...");
}
}
func sendToChans(chans []*chan string, message string){
for i := 0; i < len(chans); i++ {
*chans[i] <- message
}
}
我知道,一大段代码是吗?我注释掉了大部分...
无论如何,如果您曾经使用过 websocket,那么其中的大部分应该都非常熟悉:
1) func wsHandler()
每次用户连接时触发。它在guestMap
(对于连接的每个唯一设备)中创建一个条目,其中包含一个connStruct
,其中包含一个频道列表:ConnRoutineChans []*chan string
。这一切都传递给:
2) echo()
,这是一个为每个 websocket 连接不断运行的 goroutine。在这里,我只是测试向其他 运行 goroutines 发送消息,但似乎我的 for 循环实际上并没有持续触发。它仅在 websocket 从其连接的开放 tab/window 接收到消息时触发。 (如果有人能阐明这个机制,我很想知道为什么它不不断循环?)
3) 对于用户在给定设备上打开的每个 window 或选项卡,都有一个存储在数组中的 websocket 和通道。我希望能够向数组中的所有通道发送消息(本质上是该设备上打开 tabs/windows 的其他 goroutines)并在其他 goroutines 中接收消息以更改不断设置的一些变量 运行协程
我现在所拥有的仅适用于设备上的第一个连接,并且(当然)它会向自身发送 "sup dude?",因为它是当时阵列中的唯一通道。然后,如果您打开一个新标签(或什至很多),则消息根本不会发送给任何人!奇怪?...然后当我关闭所有选项卡(并且我注释掉的逻辑从 guestMap
中删除设备项)并启动一个新的设备会话时,仍然只有第一个连接得到它自己的消息。
我已经有一种方法可以将消息发送到设备上的所有其他 websocket,但是发送到 goroutine 似乎比我想象的要棘手一些。
回答我自己的问题:
首先,我已经从 sync.map 切换到法线贴图。其次,为了不让任何人同时对它 reading/writing,我创建了一个频道,您可以调用该频道在地图上执行任何 read/write 操作。我一直在尽最大努力使我的数据访问和操作能够快速执行,这样频道就不会那么容易拥挤。这是一个小例子:
package main
import (
"fmt"
)
var (
guestMap map[string]*guestStruct = make(map[string]*guestStruct);
guestMapActionChan = make (chan actionStruct);
)
type actionStruct struct {
Action func([]interface{})[]interface{}
Params []interface{}
ReturnChan chan []interface{}
}
type guestStruct struct {
Name string
Numb int
}
func main(){
//make chan listener
go guestMapActionChanListener(guestMapActionChan)
//some guest logs in...
newGuest := guestStruct{Name: "Larry Josher", Numb: 1337}
//add to the map
addRetChan := make(chan []interface{})
guestMapActionChan <- actionStruct{Action: guestMapAdd,
Params: []interface{}{&newGuest},
ReturnChan: addRetChan}
addReturned := <-addRetChan
fmt.Println(addReturned)
fmt.Println("Also, numb was changed by listener to:", newGuest.Numb)
// Same kind of thing for removing, except (of course) there's
// a lot more logic to a real-life application.
}
func guestMapActionChanListener (c chan actionStruct){
for{
value := <-c;
//
returned := value.Action(value.Params);
value.ReturnChan <- returned;
close(value.ReturnChan)
}
}
func guestMapAdd(params []interface{}) []interface{} {
//.. do some parameter verification checks
theStruct := params[0].(*guestStruct)
name := theStruct.Name
theStruct.Numb = 75
guestMap[name] = &*theStruct
return []interface{}{"Added '"+name+"' to the guestMap"}
}
对于连接之间的通信,我只是让每个套接字循环保持它们的 guestStruct
,并有更多 guestMapActionChan
函数负责将数据分发给其他客人的 guestStruct
s
现在,我不会将此标记为正确答案,除非我得到一些关于如何以正确的方式做这样的事情的更好建议。但现在这是可行的,应该 保证 reading/writing 没有比赛到地图。
编辑:正确的方法应该是像我在(大部分)完成的项目 GopherGameServer
中那样使用 sync.Mutex
我在尝试为给定用户保持某些 websocket 同步时面临两难境地。这是基本设置:
type msg struct {
Key string
Value string
}
type connStruct struct {
//...
ConnRoutineChans []*chan string
LoggedIn bool
Login string
//...
Sockets []*websocket.Conn
}
var (
//...
/* LIST OF CONNECTED USERS AN THEIR IP ADDRESSES */
guestMap sync.Map
)
func main() {
post("Started...")
rand.Seed(time.Now().UTC().UnixNano())
http.HandleFunc("/wss", wsHandler)
panic(http.ListenAndServeTLS("...", "...", "...", nil))
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Origin")+":8080" != "https://...:8080" {
http.Error(w, "Origin not allowed", 403)
fmt.Println("Client origin not allowed! (https://"+r.Host+")")
fmt.Println("r.Header Origin: "+r.Header.Get("Origin"))
return
}
///
conn, err := websocket.Upgrade(w, r, w.Header(), 1024, 1024)
if err != nil {
http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
fmt.Println("Could not open websocket connection with client!")
}
//ADD CONNECTION TO guestMap IF CONNECTION IS nil
var authString string = /*gets device identity*/;
var authChan chan string = make(chan string);
authValue, authOK := guestMap.Load(authString);
if !authOK {
// NO SESSION, CREATE A NEW ONE
newSession = getSession();
//defer newSession.Close();
guestMap.Store(authString, connStruct{ LoggedIn: false,
ConnRoutineChans: []*chan string{&authChan},
Login: "",
Sockets: []*websocket.Conn{conn}
/* .... */ });
}else{
//SESSION STARTED, ADD NEW SOCKET TO Sockets
var tempConn connStruct = authValue.(connStruct);
tempConn.Sockets = append(tempConn.Sockets, conn);
tempConn.ConnRoutineChans = append(tempConn.ConnRoutineChans, &authChan)
guestMap.Store(authString, tempConn);
}
//
go echo(conn, authString, &authChan);
}
func echo(conn *websocket.Conn, authString string, authChan *chan string) {
var message msg;
//TEST CHANNEL
authValue, _ := guestMap.Load(authString);
go sendToChans(authValue.(connStruct).ConnRoutineChans, "sup dude?")
fmt.Println("got past send...");
for true {
select {
case val := <-*authChan:
// use value of channel
fmt.Println("AuthChan for user #"+strconv.Itoa(myConnNumb)+" spat out: ", val)
default:
// if channels are empty, this is executed
}
readError := conn.ReadJSON(&message)
fmt.Println("got past readJson...");
if readError != nil || message.Key == "" {
//DISCONNECT USER
//.....
return
}
//
_key, _value := chief(message.Key, message.Value, &*conn, browserAndOS, authString)
if writeError := conn.WriteJSON(_key + "|" + _value); writeError != nil {
//...
return
}
fmt.Println("got past writeJson...");
}
}
func sendToChans(chans []*chan string, message string){
for i := 0; i < len(chans); i++ {
*chans[i] <- message
}
}
我知道,一大段代码是吗?我注释掉了大部分...
无论如何,如果您曾经使用过 websocket,那么其中的大部分应该都非常熟悉:
1) func wsHandler()
每次用户连接时触发。它在guestMap
(对于连接的每个唯一设备)中创建一个条目,其中包含一个connStruct
,其中包含一个频道列表:ConnRoutineChans []*chan string
。这一切都传递给:
2) echo()
,这是一个为每个 websocket 连接不断运行的 goroutine。在这里,我只是测试向其他 运行 goroutines 发送消息,但似乎我的 for 循环实际上并没有持续触发。它仅在 websocket 从其连接的开放 tab/window 接收到消息时触发。 (如果有人能阐明这个机制,我很想知道为什么它不不断循环?)
3) 对于用户在给定设备上打开的每个 window 或选项卡,都有一个存储在数组中的 websocket 和通道。我希望能够向数组中的所有通道发送消息(本质上是该设备上打开 tabs/windows 的其他 goroutines)并在其他 goroutines 中接收消息以更改不断设置的一些变量 运行协程
我现在所拥有的仅适用于设备上的第一个连接,并且(当然)它会向自身发送 "sup dude?",因为它是当时阵列中的唯一通道。然后,如果您打开一个新标签(或什至很多),则消息根本不会发送给任何人!奇怪?...然后当我关闭所有选项卡(并且我注释掉的逻辑从 guestMap
中删除设备项)并启动一个新的设备会话时,仍然只有第一个连接得到它自己的消息。
我已经有一种方法可以将消息发送到设备上的所有其他 websocket,但是发送到 goroutine 似乎比我想象的要棘手一些。
回答我自己的问题:
首先,我已经从 sync.map 切换到法线贴图。其次,为了不让任何人同时对它 reading/writing,我创建了一个频道,您可以调用该频道在地图上执行任何 read/write 操作。我一直在尽最大努力使我的数据访问和操作能够快速执行,这样频道就不会那么容易拥挤。这是一个小例子:
package main
import (
"fmt"
)
var (
guestMap map[string]*guestStruct = make(map[string]*guestStruct);
guestMapActionChan = make (chan actionStruct);
)
type actionStruct struct {
Action func([]interface{})[]interface{}
Params []interface{}
ReturnChan chan []interface{}
}
type guestStruct struct {
Name string
Numb int
}
func main(){
//make chan listener
go guestMapActionChanListener(guestMapActionChan)
//some guest logs in...
newGuest := guestStruct{Name: "Larry Josher", Numb: 1337}
//add to the map
addRetChan := make(chan []interface{})
guestMapActionChan <- actionStruct{Action: guestMapAdd,
Params: []interface{}{&newGuest},
ReturnChan: addRetChan}
addReturned := <-addRetChan
fmt.Println(addReturned)
fmt.Println("Also, numb was changed by listener to:", newGuest.Numb)
// Same kind of thing for removing, except (of course) there's
// a lot more logic to a real-life application.
}
func guestMapActionChanListener (c chan actionStruct){
for{
value := <-c;
//
returned := value.Action(value.Params);
value.ReturnChan <- returned;
close(value.ReturnChan)
}
}
func guestMapAdd(params []interface{}) []interface{} {
//.. do some parameter verification checks
theStruct := params[0].(*guestStruct)
name := theStruct.Name
theStruct.Numb = 75
guestMap[name] = &*theStruct
return []interface{}{"Added '"+name+"' to the guestMap"}
}
对于连接之间的通信,我只是让每个套接字循环保持它们的 guestStruct
,并有更多 guestMapActionChan
函数负责将数据分发给其他客人的 guestStruct
s
现在,我不会将此标记为正确答案,除非我得到一些关于如何以正确的方式做这样的事情的更好建议。但现在这是可行的,应该 保证 reading/writing 没有比赛到地图。
编辑:正确的方法应该是像我在(大部分)完成的项目 GopherGameServer
中那样使用sync.Mutex