如何使用 context.WithCancel 启动和停止每个会话的心跳?
How to start & stop heartbeat per session using context.WithCancel?
我目前正在为 TypeDB 实现 Golang 客户端,并且正在为他们基于会话的心跳约定而苦苦挣扎。通常,你为每个客户端实现心跳,这样相对容易,只需 运行 在后台运行一个 gorountine,每隔几秒发送一次心跳。
然而,TypeDB 选择在每个会话基础上实施心跳(他们称之为脉冲)。这意味着,每次创建新会话时,我都必须开始使用单独的 GoRoutine 监视该会话。反之,如果客户端关闭了一个session,我就得停止监听。特别难看的是,我还必须每隔一段时间检查一次停滞的会话。切换到每个客户端心跳时存在 GH 问题,但没有 ETA,因此我必须使会话心跳起作用以防止服务端会话终止。
到目前为止,我的解决方案:
- 创建一个新会话
- 打开该会话并检查错误
- 如果没有错误,将会话添加到由会话 ID 键控的哈希映射中
这似乎暂时有效。代码,仅供上下文参考:
https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session.go
为了监控每个会话,我正在考虑两个问题:
香奈儿关闭多个 gorountines 有点棘手,可能会导致竞争条件。
我需要某种错误组来捕获心跳故障,即在服务器关闭或网络 link 错误的情况下。
考虑到所有这些,我相信 context.WithCancel
可能是安全而明智的解决方案。
到目前为止我想到的是:
- 将全局上下文作为参数传递给心跳函数
- 为每个调用 heartbeat 的会话创建一个新的上下文 WithCancel
- 运行 GoRoutine 中的心跳直到调用取消(通过 stopMonitoring)或发生错误
我不太清楚的是,我如何跟踪从每个跟踪会话返回的所有取消函数,以确保我关闭与要关闭的会话匹配的正确 GoRotuine?
感谢您提供解决此问题的任何提示。
代码:
func (s SessionManager) startMonitorSession(sessionID []byte) {
// How do I track each goRoutine per session
}
func (s SessionManager) stopMonitorSession(sessionID []byte) {
// How do I call the correct cancel function to stop the GoRoutine matching the session?
}
func (s SessionManager) runHeartbeat(ctx context.Context, sessionID []byte) context.CancelFunc {
// Create a new context, with its cancellation function from the original context
ctx, cancel := context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
fmt.Println("Stopped monitoring session: ")
default:
err := s.sendPulseRequest(sessionID)
// If this operation returns an error
// cancel all operations using this local context created above
if err != nil {
cancel()
}
fmt.Println("done")
}
}()
// return cancel function for call site to close at a later stage
return cancel
}
func (s SessionManager) sendPulseRequest(sessionID []byte) error {
mtd := "sendPulse: "
req := requests.GetSessionPulseReq(sessionID)
res, pulseErr := s.client.client.SessionPulse(s.client.ctx, req)
if pulseErr != nil {
dbgPrint(mtd, "Heartbeat error. Close session")
return pulseErr
}
if res.Alive == false {
dbgPrint(mtd, "Server not alive anymore. Close session")
closeErr := s.CloseSession(sessionID)
if closeErr != nil {
return closeErr
}
}
// no error
return nil
}
更新:
感谢评论,我通过将会话和 CancelFunc 包装在一个名为 TypeDBSession 的专用结构中设法解决了大部分问题。
这样,停止函数只是从结构中拉取 CancelFunc,调用它,然后停止监视 GoRoutine。通过一些更多的调整,测试似乎通过了,尽管这暂时不是并发安全的。
话虽这么说,但这是一个需要解决的重要问题。再次感谢评论!
如果有人愿意提出一些代码改进建议,尤其是 w.r.t 以使这种并发安全,请随时在此处发表评论或填写 GH 问题/PR。
会话类型:
https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_type.go
会话监控:
https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_monitor.go
测试:
https://github.com/marvin-hansen/typedb-client-go/tree/main/test/client/session
我的两分钱:
- 您可能需要 运行 重复的心跳。在 select
周围使用带有 time.Ticker
的 for
- 存储地图会话ID —> func() 以跟踪所有可取消的上下文。也许你应该将 id 转换为 string
我目前正在为 TypeDB 实现 Golang 客户端,并且正在为他们基于会话的心跳约定而苦苦挣扎。通常,你为每个客户端实现心跳,这样相对容易,只需 运行 在后台运行一个 gorountine,每隔几秒发送一次心跳。
然而,TypeDB 选择在每个会话基础上实施心跳(他们称之为脉冲)。这意味着,每次创建新会话时,我都必须开始使用单独的 GoRoutine 监视该会话。反之,如果客户端关闭了一个session,我就得停止监听。特别难看的是,我还必须每隔一段时间检查一次停滞的会话。切换到每个客户端心跳时存在 GH 问题,但没有 ETA,因此我必须使会话心跳起作用以防止服务端会话终止。
到目前为止,我的解决方案:
- 创建一个新会话
- 打开该会话并检查错误
- 如果没有错误,将会话添加到由会话 ID 键控的哈希映射中
这似乎暂时有效。代码,仅供上下文参考:
https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session.go
为了监控每个会话,我正在考虑两个问题:
香奈儿关闭多个 gorountines 有点棘手,可能会导致竞争条件。
我需要某种错误组来捕获心跳故障,即在服务器关闭或网络 link 错误的情况下。
考虑到所有这些,我相信 context.WithCancel
可能是安全而明智的解决方案。
到目前为止我想到的是:
- 将全局上下文作为参数传递给心跳函数
- 为每个调用 heartbeat 的会话创建一个新的上下文 WithCancel
- 运行 GoRoutine 中的心跳直到调用取消(通过 stopMonitoring)或发生错误
我不太清楚的是,我如何跟踪从每个跟踪会话返回的所有取消函数,以确保我关闭与要关闭的会话匹配的正确 GoRotuine?
感谢您提供解决此问题的任何提示。
代码:
func (s SessionManager) startMonitorSession(sessionID []byte) {
// How do I track each goRoutine per session
}
func (s SessionManager) stopMonitorSession(sessionID []byte) {
// How do I call the correct cancel function to stop the GoRoutine matching the session?
}
func (s SessionManager) runHeartbeat(ctx context.Context, sessionID []byte) context.CancelFunc {
// Create a new context, with its cancellation function from the original context
ctx, cancel := context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
fmt.Println("Stopped monitoring session: ")
default:
err := s.sendPulseRequest(sessionID)
// If this operation returns an error
// cancel all operations using this local context created above
if err != nil {
cancel()
}
fmt.Println("done")
}
}()
// return cancel function for call site to close at a later stage
return cancel
}
func (s SessionManager) sendPulseRequest(sessionID []byte) error {
mtd := "sendPulse: "
req := requests.GetSessionPulseReq(sessionID)
res, pulseErr := s.client.client.SessionPulse(s.client.ctx, req)
if pulseErr != nil {
dbgPrint(mtd, "Heartbeat error. Close session")
return pulseErr
}
if res.Alive == false {
dbgPrint(mtd, "Server not alive anymore. Close session")
closeErr := s.CloseSession(sessionID)
if closeErr != nil {
return closeErr
}
}
// no error
return nil
}
更新:
感谢评论,我通过将会话和 CancelFunc 包装在一个名为 TypeDBSession 的专用结构中设法解决了大部分问题。
这样,停止函数只是从结构中拉取 CancelFunc,调用它,然后停止监视 GoRoutine。通过一些更多的调整,测试似乎通过了,尽管这暂时不是并发安全的。
话虽这么说,但这是一个需要解决的重要问题。再次感谢评论!
如果有人愿意提出一些代码改进建议,尤其是 w.r.t 以使这种并发安全,请随时在此处发表评论或填写 GH 问题/PR。
会话类型:
https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_type.go
会话监控:
https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_monitor.go
测试:
https://github.com/marvin-hansen/typedb-client-go/tree/main/test/client/session
我的两分钱:
- 您可能需要 运行 重复的心跳。在 select 周围使用带有
- 存储地图会话ID —> func() 以跟踪所有可取消的上下文。也许你应该将 id 转换为 string
time.Ticker
的 for