goroutines 有很高的空闲唤醒呼叫

goroutines have high idle wake up calls

我使用 GoLang 运行 两个 websocket 客户端(一个用于私人,一个用于 public 数据)同时使用 goroutines。从表面上看,一切似乎都很好。两个客户端都接收从 websocket 服务器传输的数据。我相信我可能设置了错误,但是,因为当我检查 activity 监视器时,我的程序一直有 500 - 1500 次空闲唤醒,并且正在使用我的 CPU 的 >200%。对于像两个 websocket 客户端这样简单的东西来说,这似乎不正常。

我将代码放在片段中,这样阅读起来就更少了(希望这样更容易理解),但如果您需要完整的代码,我也可以 post。这是我的主要功能中的代码,运行 是 ws 客户端

comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup

wg.Add(1)
go pubSocket.PubListen(ctx, &wg, &activeSubs, testing)
wg.Add(1)
go privSocket.PrivListen(ctx, &wg, &activeSubs, testing)

<- comms
cancel()
wg.Wait()

这是客户端如何运行 go routines

的代码
func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing public socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
                pubJsonDecoder(message, testing)
                //tradesParser(message);
            }
        }
    }
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing private socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
            }
        }
    }
}

关于空闲唤醒率为何如此之高的任何想法?我应该使用多线程而不是并发吗?在此先感谢您的帮助!

你在这里浪费 CPU(多余的循环):

  for {
       // ...
        default:
        // High CPU usage here.
        }
    }

尝试这样的事情:

 func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    defer socket.Close()

    socket.OnTextMessage = func(message string, socket Socket) {
        log.Println(message)
        pubJsonDecoder(message, testing)
        //tradesParser(message);
    }

    <-ctx.Done()
    log.Println("closing public socket")
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    defer socket.Close()

    socket.OnTextMessage = func(message string, socket Socket) {
        log.Println(message)
    }

    <-ctx.Done()
    log.Println("closing private socket")
}

这也可能有帮助:
https://github.com/gorilla/websocket/blob/master/examples/chat/client.go

tl/dr:websockets 很难 :)

看起来你可能有几个旋转器。在 for - select 语句的默认情况下,您正在为 OnTextMessage() 分配处理函数。如果没有其他案例准备就绪,则默认案例始终执行。因为在默认情况下没有任何阻塞,所以 for 循环就会失控。两个像这样旋转的 goroutines 可能会挂住 2 个核心。 Websockets 是网络 IO,那些 goroutines 很可能 运行 并行。这就是您看到 200% 利用率的原因。

看看 gorilla/websocket 图书馆。我不会说它比任何其他 websocket 库更好或更差,我对它有很多经验。

https://github.com/gorilla/websocket

下面是我用过很多次的一个实现。 它的设置方式是注册在收到特定消息时触发的处理函数。假设您的消息中的一个值是“type”:“start-job”,websocket 服务器将调用您分配给“start-job”websocket 消息的处理程序。感觉就像为 http 路由器编写端点。

打包服务器ws

context.go

package serverws

import (
    "errors"
    "fmt"
    "strings"
    "sync"
)

// ConnContext is the connection context to track a connected websocket user
type ConnContext struct {
    specialKey  string
    supportGzip string
    UserID      string
    mu         sync.Mutex // Websockets are not thread safe, we'll use a mutex to lock writes.
}

// HashKeyAsCtx returns a ConnContext based on the hash provided
func HashKeyAsCtx(hashKey string) (*ConnContext, error) {
    values := strings.Split(hashKey, ":")
    if len(values) != 3 {
        return nil, errors.New("Invalid Key received: " + hashKey)
    }
    return &ConnContext{values[0], values[1], values[2], sync.Mutex{}}, nil
}

// AsHashKey returns the hash key for a given connection context ConnContext
func (ctx *ConnContext) AsHashKey() string {
    return strings.Join([]string{ctx.specialKey, ctx.supportGzip, ctx.UserID}, ":")
}

// String returns a string of the hash of a given connection context ConnContext
func (ctx *ConnContext) String() string {
    return fmt.Sprint("specialkey: ", ctx.specialKey, " gzip ", ctx.supportGzip, " auth ", ctx.UserID)
}

wshandler.go

package serverws

import (
    "encoding/json"
    "errors"
    "fmt"
    "net/http"
    "strings"
    "sync"
    "time"

    "github.com/gorilla/websocket"
    "github.com/rs/zerolog/log"
)

var (
    receiveFunctionMap = make(map[string]ReceiveObjectFunc)
    ctxHashMap         sync.Map
)

// ReceiveObjectFunc is a function signature for a websocket request handler
type ReceiveObjectFunc func(conn *websocket.Conn, ctx *ConnContext, t map[string]interface{})

// WebSocketHandler does what it says, handles WebSockets (makes them easier for us to deal with)
type WebSocketHandler struct {
    wsupgrader websocket.Upgrader
}

// WebSocketMessage that is sent over a websocket.   Messages must have a conversation type so the server and the client JS know
// what is being discussed and what signals to raise on the server and the client.
// The "Notification" message instructs the client to display an alert popup.
type WebSocketMessage struct {
    MessageType string      `json:"type"`
    Message     interface{} `json:"message"`
}

// NewWebSocketHandler sets up a new websocket.
func NewWebSocketHandler() *WebSocketHandler {
    wsh := new(WebSocketHandler)
    wsh.wsupgrader = websocket.Upgrader{
        ReadBufferSize:  4096,
        WriteBufferSize: 4096,
    }
    return wsh

}

// RegisterMessageType sets up an event bus for a message type.   When messages arrive from the client that match messageTypeName,
// the function you wrote to handle that message is then called.
func (wsh *WebSocketHandler) RegisterMessageType(messageTypeName string, f ReceiveObjectFunc) {
    receiveFunctionMap[messageTypeName] = f
}

// onMessage triggers when the underlying websocket has received a message.
func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) {
    //  Handling text messages or binary messages. Binary is usually some gzip text.
    if msgType == websocket.TextMessage {
        wsh.processIncomingTextMsg(conn, ctx, msg)
    }
    if msgType == websocket.BinaryMessage {

    }
}

// onOpen triggers when the underlying websocket has established a connection.
func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) {
    //user, err := gothic.GetFromSession("ID", r)
    user := "TestUser"
    if err := r.ParseForm(); err != nil {
        return nil, errors.New("parameter check error")
    }

    specialKey := r.FormValue("specialKey")
    supportGzip := r.FormValue("support_gzip")

    if user != "" && err == nil {
        ctx = &ConnContext{specialKey, supportGzip, user, sync.Mutex{}}
    } else {
        ctx = &ConnContext{specialKey, supportGzip, "", sync.Mutex{}}
    }

    keyString := ctx.AsHashKey()

    if oldConn, ok := ctxHashMap.Load(keyString); ok {
        wsh.onClose(oldConn.(*websocket.Conn), ctx)
        oldConn.(*websocket.Conn).Close()
    }
    ctxHashMap.Store(keyString, conn)
    return ctx, nil
}

// onClose triggers when the underlying websocket has been closed down
func (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) {
    //log.Info().Msg(("client close itself as " + ctx.String()))
    wsh.closeConnWithCtx(ctx)
}

// onError triggers when a websocket connection breaks
func (wsh *WebSocketHandler) onError(errMsg string) {
    //log.Error().Msg(errMsg)
}

// HandleConn happens when a user connects to us at the listening point.  We ask
// the user to authenticate and then send the required HTTP Upgrade return code.
func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) {

    user := ""
    if r.URL.Path == "/websocket" {
        user = "TestUser" // authenticate however you want
        if user == "" {
            fmt.Println("UNAUTHENTICATED USER TRIED TO CONNECT TO WEBSOCKET FROM ", r.Header.Get("X-Forwarded-For"))
            return
        }
    }
    // don't do this.  You need to check the origin, but this is here as a place holder
    wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool {
        return true
    }

    conn, err := wsh.wsupgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Error().Msg("Failed to set websocket upgrade: " + err.Error())
        return
    }
    defer conn.Close()

    ctx, err := wsh.onOpen(conn, r)
    if err != nil {
        log.Error().Msg("Open connection failed " + err.Error() + r.URL.RawQuery)
        if user != "" {
            ctx.UserID = user
        }
        return
    }

    if user != "" {
        ctx.UserID = user
    }
    conn.SetPingHandler(func(message string) error {
        conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
        return nil
    })

    // Message pump for the underlying websocket connection
    for {
        t, msg, err := conn.ReadMessage()
        if err != nil {
            // Read errors are when the user closes the tab. Ignore.
            wsh.onClose(conn, ctx)
            return
        }

        switch t {
        case websocket.TextMessage, websocket.BinaryMessage:
            wsh.onMessage(conn, ctx, msg, t)
        case websocket.CloseMessage:
            wsh.onClose(conn, ctx)
            return
        case websocket.PingMessage:
        case websocket.PongMessage:
        }

    }

}

func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) {
    keyString := ctx.AsHashKey()
    ctxHashMap.Delete(keyString)
}

func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) {
    //log.Debug().Msg("CLIENT SAID " + string(msg))
    data := WebSocketMessage{}

    // try to turn this into data
    err := json.Unmarshal(msg, &data)

    // And try to get at the data underneath
    var raw = make(map[string]interface{})
    terr := json.Unmarshal(msg, &raw)

    if err == nil {
        // What kind of message is this?
        if receiveFunctionMap[data.MessageType] != nil {
            // We'll try to cast this message and call the handler for it
            if terr == nil {
                if v, ok := raw["message"].(map[string]interface{}); ok {
                    receiveFunctionMap[data.MessageType](conn, ctx, v)
                } else {
                    log.Debug().Msg("Nonsense sent over the websocket.")
                }
            } else {
                log.Debug().Msg("Nonsense sent over the websocket.")
            }
        }
    } else {
        // Received garbage from the transmitter.
    }
}

// SendJSONToSocket sends a specific message to a specific websocket
func (wsh *WebSocketHandler) SendJSONToSocket(socketID string, msg interface{}) {
    fields := strings.Split(socketID, ":")
    message, _ := json.Marshal(msg)

    ctxHashMap.Range(func(key interface{}, value interface{}) bool {
        if ctx, err := HashKeyAsCtx(key.(string)); err != nil {
            wsh.onError(err.Error())
        } else {
            if ctx.specialKey == fields[0] {
                ctx.mu.Lock()
                if value != nil {
                    err = value.(*websocket.Conn).WriteMessage(websocket.TextMessage, message)
                }
                ctx.mu.Unlock()
            }
            if err != nil {
                ctx.mu.Lock() // We'll lock here even though we're going to destroy this
                wsh.onClose(value.(*websocket.Conn), ctx)
                value.(*websocket.Conn).Close()
                ctxHashMap.Delete(key) // Remove the websocket immediately
                //wsh.onError("WRITE ERR TO USER " + key.(string) + " ERR: " + err.Error())
            }
        }
        return true
    })
}

封装 wsocket

types.go

package wsocket



// Acknowledgement is for ACKing simple messages and sending errors
type Acknowledgement struct {
    ResponseID string `json:"responseId"`
    Status     string `json:"status"`
    IPAddress  string `json:"ipaddress"`
    ErrorText  string `json:"errortext"`
}


wsocket.go

package wsocket

import (
    "fmt"
    server "project/serverws"
    "project/utils"
    "sync"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    // "github.com/mitchellh/mapstructure"
    "github.com/inconshreveable/log15"
)
var (
    WebSocket         *server.WebSocketHandler // So other packages can send out websocket messages
    WebSocketLocation string
    Log               log15.Logger = log15.New("package", "wsocket"
)

func SetupWebsockets(r *gin.Engine, socket *server.WebSocketHandler, debug_mode bool) {

    WebSocket = socket
    WebSocketLocation = "example.mydomain.com"
    //WebSocketLocation = "example.mydomain.com"
    r.GET("/websocket", func(c *gin.Context) {
        socket.HandleConn(c.Writer, c.Request)

    })

socket.RegisterMessageType("Hello", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {

        response := Acknowledgement{
            ResponseID: "Hello",
            Status:     fmt.Sprintf("OK/%v", ctx.AuthID),
            IPAddress:  conn.RemoteAddr().String(),
        }
        // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in
        socket.SendJSONToSocket(ctx.AsHashKey(), &response)
    })

socket.RegisterMessageType("start-job", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {

        response := Acknowledgement{
            ResponseID: "starting_job",
            Status:     fmt.Sprintf("%s is being dialed.", data["did"]),
            IPAddress:  conn.RemoteAddr().String(),
        }
        // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in to a struct.
        socket.SendJSONToSocket(ctx.AsHashKey(), &response)

    })

此实现适用于 Web 应用程序。这是 javascript 中客户端的简化版本。您可以使用此实现处理许多并发连接,并且您所做的所有通信都是定义 objects/structs ,其中包含与下面的开关中的案例匹配的 responseID,它基本上是一个长的开关语句,将其序列化并将其发送到另一边,另一边会确认。我在几个生产环境中有这个 运行ning 的一些版本。

websocket.js

$(() => {

    function wsMessage(object) {
        switch (object.responseId) {
            case "Hello": // HELLO! :-)
                console.log("Heartbeat received, we're connected.");
                break;

            case "Notification":
                if (object.errortext != "") {
                    $.notify({
                        // options
                        message: '<center><B><i class="fas fa-exclamation-triangle"></i>&nbsp;&nbsp;' + object.errortext + '</B></center>',
                    }, {
                        // settings
                        type: 'danger',
                        offset: 50,
                        placement: {
                            align: 'center',
                        }
                       
                    });

                } else {
                    $.notify({
                        // options
                        message: '<center><B>' + object.status + '</B></center>',
                    }, {
                        // settings
                        type: 'success',
                        offset: 50,
                        placement: {
                            align: 'center',
                        }
                    });
                }
                break;
           
        }

    }



    $(document).ready(function () {

        function heartbeat() {
            if (!websocket) return;
            if (websocket.readyState !== 1) return;
            websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }");
            setTimeout(heartbeat, 24000);
        }
        //TODO: CHANGE TO WSS once tls is enabled.
        function wireUpWebsocket() {
            websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0');

            websocket.onopen = function (event) {
                console.log("Websocket connected.");

                heartbeat();
                //if it exists
                if (typeof (wsReady) !== 'undefined') {
                    //execute it
                    wsReady();
                }

            };

            websocket.onerror = function (event) {
                console.log("WEBSOCKET ERROR " + event.data);

            };

            websocket.onmessage = function (event) {
                wsMessage(JSON.parse(event.data));
            };

            websocket.onclose = function () {
                // Don't close!
                // Replace key
                console.log("WEBSOCKET CLOSED");
                WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
                websocketreconnects++;
                if (websocketreconnects > 30) { // Too much, time to bounce
                    // location.reload(); Don't reload the page anymore, just re-connect.
                }
                setTimeout(function () { wireUpWebsocket(); }, 3000);
            };
        }

        wireUpWebsocket();
    });

});

function getCookie(name) {
    var value = "; " + document.cookie;
    var parts = value.split("; " + name + "=");
    if (parts.length == 2) return parts.pop().split(";").shift();
}

function setCookie(cname, cvalue, exdays) {
    var d = new Date();
    d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000));
    var expires = "expires=" + d.toUTCString();
    document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
}

在无限循环中一遍又一遍地分配处理函数肯定是行不通的。

https://github.com/gorilla/websocket