Goroutine 安全通道关闭实际上并没有关闭 websocket
Goroutine safe channel close doesn't actually close webscoket
这是一个棘手的问题,让我很烦恼。
本质上,我编写了一个集成微服务,它使用 Go 客户端提供来自 Binance 加密货币交易所的数据流。客户端发送开始消息,开始符号的数据流,并在某个时候发送关闭消息以停止流。我的实现基本上是这样的:
func (c BinanceClient) StartDataStream(clientType bn.ClientType, symbol, interval string) error {
switch clientType {
case bn.SPOT_LIVE:
wsKlineHandler := c.handlers.klineHandler.SpotKlineHandler
wsErrHandler := c.handlers.klineHandler.ErrHandler
_, stopC, err := binance.WsKlineServe(symbol, interval, wsKlineHandler, wsErrHandler)
if err != nil {
fmt.Println(err)
return err
} else {
c.state.clientSymChanMap[clientType][symbol] = stopC
return nil
}
...
}
clientSymChanMap 将 stopChannel 存储在嵌套的哈希图中,以便我稍后可以检索停止通道以停止数据馈送。停止功能已相应实现:
func (c BinanceClient) StopDataStream(clientType bn.ClientType, symbol string) {
//mtd := "StopDataStream: "
stopC := c.state.clientSymChanMap[clientType][symbol]
if isClosed(stopC) {
DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
close(stopC)
}
// Delete channel from the map otherwise the next StopAll throws a NPE due to closing a dead channel
delete(c.state.clientSymChanMap[clientType], symbol)
return
}
为了防止已经关闭的频道出现恐慌,我使用了一个检查功能,如果频道已经关闭,returns 为真。
func isClosed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
}
return false
}
看起来不错,但有一个问题。当我 运行 代码只有一个符号的起始数据时,它完全按照预期启动和关闭数据馈送。
但是,当启动多个数据馈送时,上述代码不会以某种方式关闭 websocket,只会永远保持流式传输数据。如果没有 isClosed 检查,我会因为尝试关闭一个已关闭的通道而感到恐慌,但是有了检查,什么都不会关闭。
查看 implementation of the above binance.WsKlineServe 函数时,很明显它只是在每次调用时包装一个新的 websocket,然后 returns 完成和停止通道。
文档给出了以下用法示例:
wsKlineHandler := func(event *binance.WsKlineEvent) {
fmt.Println(event)
}
errHandler := func(err error) {
fmt.Println(err)
}
doneC, stopC, err := binance.WsKlineServe("LTCBTC", "1m", wsKlineHandler, errHandler)
if err != nil {
fmt.Println(err)
return
}
<-doneC
因为 doneC 通道实际上会阻塞,所以我删除了它并认为存储 stopC 通道然后稍后使用它来停止数据馈送会起作用。但是,它只针对一个实例这样做。当打开多个流时,这将不再起作用。
知道这是怎么回事以及如何解决吗?
首先,这很危险:
if isClosed(stopC) {
DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
close(stopC) // <- can't be sure channel is still open
}
无法保证在您对通道状态进行轮询检查后,通道在下一行代码中仍将处于相同状态。因此,如果同时调用这段代码,理论上它可能会出现恐慌。
如果您希望在通道关闭时发生异步操作 - 最好从它自己的 goroutine 中明确地执行此操作。所以你可以试试这个:
go func() {
stopC := c.state.clientSymChanMap[clientType][symbol]
<-stopC
// stopC definitely closed now
delete(c.state.clientSymChanMap[clientType], symbol)
}()
P.S。您确实需要在您的地图上使用某种互斥体,因为删除是异步的 - 您需要确保添加到地图的任何内容都不会与此发生数据竞争。
P.P.S 通道在超出范围时会被 GC 回收。如果您不再阅读它 - 它们不需要明确关闭即可由 GC 回收。
使用通道来停止 goroutine 或关闭某些东西是非常棘手的。有很多事情你可能会做错或忘记做。
context.WithCancel
将复杂性抽象化,使代码更具可读性和可维护性。
一些代码片段:
ctx, cancel := context.WitchCancel(context.TODO())
TheThingToCancel(ctx, ...)
// Whenever you want to stop TheThingToCancel. Can be called multiple times.
cancel()
然后在 for 循环中,您通常会有这样的 select
:
for {
select {
case <-ctx.Done():
return
default:
}
// do stuff
}
这里有一些代码更接近您打开连接的具体情况:
func TheThingToCancel(ctx context.Context) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(ctx)
conn, err := net.Dial("tcp", ":12345")
if err != nil {
cancel()
return nil, err
}
go func() {
<-ctx.Done()
_ = conn.Close()
}()
go func() {
defer func() {
_ = conn.Close()
// make sure context is always cancelled to avoid goroutine leak
cancel()
}()
var bts = make([]byte, 1024)
for {
n, err := conn.Read(bts)
if err != nil {
return
}
fmt.Println(bts[:n])
}
}()
return cancel, nil
}
它returns cancel
功能可以从外部关闭它。
可以多次取消上下文,而无需 panic
就像多次关闭频道一样。这是一个优势。您还可以从其他上下文派生上下文,从而关闭许多上下文,这些上下文都通过关闭父上下文来停止不同的例程。精心设计,这对于关闭属于一起的不同例程非常强大,这些例程也需要能够单独关闭。
这是一个棘手的问题,让我很烦恼。
本质上,我编写了一个集成微服务,它使用 Go 客户端提供来自 Binance 加密货币交易所的数据流。客户端发送开始消息,开始符号的数据流,并在某个时候发送关闭消息以停止流。我的实现基本上是这样的:
func (c BinanceClient) StartDataStream(clientType bn.ClientType, symbol, interval string) error {
switch clientType {
case bn.SPOT_LIVE:
wsKlineHandler := c.handlers.klineHandler.SpotKlineHandler
wsErrHandler := c.handlers.klineHandler.ErrHandler
_, stopC, err := binance.WsKlineServe(symbol, interval, wsKlineHandler, wsErrHandler)
if err != nil {
fmt.Println(err)
return err
} else {
c.state.clientSymChanMap[clientType][symbol] = stopC
return nil
}
...
}
clientSymChanMap 将 stopChannel 存储在嵌套的哈希图中,以便我稍后可以检索停止通道以停止数据馈送。停止功能已相应实现:
func (c BinanceClient) StopDataStream(clientType bn.ClientType, symbol string) {
//mtd := "StopDataStream: "
stopC := c.state.clientSymChanMap[clientType][symbol]
if isClosed(stopC) {
DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
close(stopC)
}
// Delete channel from the map otherwise the next StopAll throws a NPE due to closing a dead channel
delete(c.state.clientSymChanMap[clientType], symbol)
return
}
为了防止已经关闭的频道出现恐慌,我使用了一个检查功能,如果频道已经关闭,returns 为真。
func isClosed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
}
return false
}
看起来不错,但有一个问题。当我 运行 代码只有一个符号的起始数据时,它完全按照预期启动和关闭数据馈送。
但是,当启动多个数据馈送时,上述代码不会以某种方式关闭 websocket,只会永远保持流式传输数据。如果没有 isClosed 检查,我会因为尝试关闭一个已关闭的通道而感到恐慌,但是有了检查,什么都不会关闭。
查看 implementation of the above binance.WsKlineServe 函数时,很明显它只是在每次调用时包装一个新的 websocket,然后 returns 完成和停止通道。
文档给出了以下用法示例:
wsKlineHandler := func(event *binance.WsKlineEvent) {
fmt.Println(event)
}
errHandler := func(err error) {
fmt.Println(err)
}
doneC, stopC, err := binance.WsKlineServe("LTCBTC", "1m", wsKlineHandler, errHandler)
if err != nil {
fmt.Println(err)
return
}
<-doneC
因为 doneC 通道实际上会阻塞,所以我删除了它并认为存储 stopC 通道然后稍后使用它来停止数据馈送会起作用。但是,它只针对一个实例这样做。当打开多个流时,这将不再起作用。
知道这是怎么回事以及如何解决吗?
首先,这很危险:
if isClosed(stopC) {
DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
close(stopC) // <- can't be sure channel is still open
}
无法保证在您对通道状态进行轮询检查后,通道在下一行代码中仍将处于相同状态。因此,如果同时调用这段代码,理论上它可能会出现恐慌。
如果您希望在通道关闭时发生异步操作 - 最好从它自己的 goroutine 中明确地执行此操作。所以你可以试试这个:
go func() {
stopC := c.state.clientSymChanMap[clientType][symbol]
<-stopC
// stopC definitely closed now
delete(c.state.clientSymChanMap[clientType], symbol)
}()
P.S。您确实需要在您的地图上使用某种互斥体,因为删除是异步的 - 您需要确保添加到地图的任何内容都不会与此发生数据竞争。
P.P.S 通道在超出范围时会被 GC 回收。如果您不再阅读它 - 它们不需要明确关闭即可由 GC 回收。
使用通道来停止 goroutine 或关闭某些东西是非常棘手的。有很多事情你可能会做错或忘记做。
context.WithCancel
将复杂性抽象化,使代码更具可读性和可维护性。
一些代码片段:
ctx, cancel := context.WitchCancel(context.TODO())
TheThingToCancel(ctx, ...)
// Whenever you want to stop TheThingToCancel. Can be called multiple times.
cancel()
然后在 for 循环中,您通常会有这样的 select
:
for {
select {
case <-ctx.Done():
return
default:
}
// do stuff
}
这里有一些代码更接近您打开连接的具体情况:
func TheThingToCancel(ctx context.Context) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(ctx)
conn, err := net.Dial("tcp", ":12345")
if err != nil {
cancel()
return nil, err
}
go func() {
<-ctx.Done()
_ = conn.Close()
}()
go func() {
defer func() {
_ = conn.Close()
// make sure context is always cancelled to avoid goroutine leak
cancel()
}()
var bts = make([]byte, 1024)
for {
n, err := conn.Read(bts)
if err != nil {
return
}
fmt.Println(bts[:n])
}
}()
return cancel, nil
}
它returns cancel
功能可以从外部关闭它。
可以多次取消上下文,而无需 panic
就像多次关闭频道一样。这是一个优势。您还可以从其他上下文派生上下文,从而关闭许多上下文,这些上下文都通过关闭父上下文来停止不同的例程。精心设计,这对于关闭属于一起的不同例程非常强大,这些例程也需要能够单独关闭。