Goroutine 安全通道关闭实际上并没有关闭 websocket

Goroutine safe channel close doesn't actually close webscoket

这是一个棘手的问题,让我很烦恼。

本质上,我编写了一个集成微服务,它使用 Go 客户端提供来自 Binance 加密货币交易所的数据流。客户端发送开始消息,开始符号的数据流,并在某个时候发送关闭消息以停止流。我的实现基本上是这样的:


func (c BinanceClient) StartDataStream(clientType bn.ClientType, symbol, interval string) error {
    
    switch clientType {

    case bn.SPOT_LIVE:
        wsKlineHandler := c.handlers.klineHandler.SpotKlineHandler
        wsErrHandler := c.handlers.klineHandler.ErrHandler

        _, stopC, err := binance.WsKlineServe(symbol, interval, wsKlineHandler, wsErrHandler)
        if err != nil {
            fmt.Println(err)
            return err
        } else {
            c.state.clientSymChanMap[clientType][symbol] = stopC
            return nil
        }
  ... 
}

clientSymChanMap 将 stopChannel 存储在嵌套的哈希图中,以便我稍后可以检索停止通道以停止数据馈送。停止功能已相应实现:


func (c BinanceClient) StopDataStream(clientType bn.ClientType, symbol string) {
    //mtd := "StopDataStream: "

    stopC := c.state.clientSymChanMap[clientType][symbol]

    if isClosed(stopC) {
        DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
    } else {
        close(stopC)
    }
    // Delete  channel from the map otherwise the next StopAll throws a NPE due to closing a dead channel
    delete(c.state.clientSymChanMap[clientType], symbol)
    return
}

为了防止已经关闭的频道出现恐慌,我使用了一个检查功能,如果频道已经关闭,returns 为真。


func isClosed(ch <-chan struct{}) bool {
    select {
    case <-ch:
        return true
    default:
    }
    return false
}

看起来不错,但有一个问题。当我 运行 代码只有一个符号的起始数据时,它完全按照预期启动和关闭数据馈送。

但是,当启动多个数据馈送时,上述代码不会以某种方式关闭 websocket,只会永远保持流式传输数据。如果没有 isClosed 检查,我会因为尝试关闭一个已关闭的通道而感到恐慌,但是有了检查,什么都不会关闭。

查看 implementation of the above binance.WsKlineServe 函数时,很明显它只是在每次调用时包装一个新的 websocket,然后 returns 完成和停止通道。

文档给出了以下用法示例:


wsKlineHandler := func(event *binance.WsKlineEvent) {
    fmt.Println(event)
}
errHandler := func(err error) {
    fmt.Println(err)
}
doneC, stopC, err := binance.WsKlineServe("LTCBTC", "1m", wsKlineHandler, errHandler)
if err != nil {
    fmt.Println(err)
    return
}
<-doneC 

因为 doneC 通道实际上会阻塞,所以我删除了它并认为存储 stopC 通道然后稍后使用它来停止数据馈送会起作用。但是,它只针对一个实例这样做。当打开多个流时,这将不再起作用。

知道这是怎么回事以及如何解决吗?

首先,这很危险:

if isClosed(stopC) {
    DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
    close(stopC) // <- can't be sure channel is still open
}

无法保证在您对通道状态进行轮询检查后,通道在下一行代码中仍将处于相同状态。因此,如果同时调用这段代码,理论上它可能会出现恐慌。


如果您希望在通道关闭时发生异步操作 - 最好从它自己的 goroutine 中明确地执行此操作。所以你可以试试这个:

go func() {

    stopC := c.state.clientSymChanMap[clientType][symbol]
    <-stopC
    // stopC definitely closed now
    delete(c.state.clientSymChanMap[clientType], symbol)
}()

P.S。您确实需要在您的地图上使用某种互斥体,因为删除是异步的 - 您需要确保添加到地图的任何内容都不会与此发生数据竞争。

P.P.S 通道在超出范围时会被 GC 回收。如果您不再阅读它 - 它们不需要明确关闭即可由 GC 回收。

使用通道来停止 goroutine 或关闭某些东西是非常棘手的。有很多事情你可能会做错或忘记做。

context.WithCancel 将复杂性抽象化,使代码更具可读性和可维护性。

一些代码片段:

ctx, cancel := context.WitchCancel(context.TODO())
TheThingToCancel(ctx, ...)

// Whenever you want to stop TheThingToCancel. Can be called multiple times.
cancel()

然后在 for 循环中,您通常会有这样的 select

for {
    select {
    case <-ctx.Done():
        return
    default:
    }

    // do stuff
}

这里有一些代码更接近您打开连接的具体情况:

func TheThingToCancel(ctx context.Context) (context.CancelFunc, error) {
    ctx, cancel := context.WithCancel(ctx)

    conn, err := net.Dial("tcp", ":12345")
    if err != nil {
        cancel()
        return nil, err
    }

    go func() {
        <-ctx.Done()
        _ = conn.Close()
    }()

    go func() {
        defer func() {
            _ = conn.Close()
            // make sure context is always cancelled to avoid goroutine leak
            cancel()
        }()

        var bts = make([]byte, 1024)
        for {
            n, err := conn.Read(bts)
            if err != nil {
                return
            }
            fmt.Println(bts[:n])
        }
    }()

    return cancel, nil
}

它returns cancel 功能可以从外部关闭它。

可以多次取消上下文,而无需 panic 就像多次关闭频道一样。这是一个优势。您还可以从其他上下文派生上下文,从而关闭许多上下文,这些上下文都通过关闭父上下文来停止不同的例程。精心设计,这对于关闭属于一起的不同例程非常强大,这些例程也需要能够单独关闭。