如何使用 context.WithCancel 启动和停止每个会话的心跳?

How to start & stop heartbeat per session using context.WithCancel?

我目前正在为 TypeDB 实现 Golang 客户端,并且正在为他们基于会话的心跳约定而苦苦挣扎。通常,你为每个客户端实现心跳,这样相对容易,只需 运行 在后台运行一个 gorountine,每隔几秒发送一次心跳。

然而,TypeDB 选择在每个会话基础上实施心跳(他们称之为脉冲)。这意味着,每次创建新会话时,我都必须开始使用单独的 GoRoutine 监视该会话。反之,如果客户端关闭了一个session,我就得停止监听。特别难看的是,我还必须每隔一段时间检查一次停滞的会话。切换到每个客户端心跳时存在 GH 问题,但没有 ETA,因此我必须使会话心跳起作用以防止服务端会话终止。

到目前为止,我的解决方案:

  1. 创建一个新会话
  2. 打开该会话并检查错误
  3. 如果没有错误,将会话添加到由会话 ID 键控的哈希映射中

这似乎暂时有效。代码,仅供上下文参考:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session.go

为了监控每个会话,我正在考虑两个问题:

  1. 香奈儿关闭多个 gorountines 有点棘手,可能会导致竞争条件。

  2. 我需要某种错误组来捕获心跳故障,即在服务器关闭或网络 link 错误的情况下。

考虑到所有这些,我相信 context.WithCancel 可能是安全而明智的解决方案。

到目前为止我想到的是:

  1. 将全局上下文作为参数传递给心跳函数
  2. 为每个调用 heartbeat 的会话创建一个新的上下文 WithCancel
  3. 运行 GoRoutine 中的心跳直到调用取消(通过 stopMonitoring)或发生错误

我不太清楚的是,我如何跟踪从每个跟踪会话返回的所有取消函数,以确保我关闭与要关闭的会话匹配的正确 GoRotuine?

感谢您提供解决此问题的任何提示。

代码:


func (s SessionManager) startMonitorSession(sessionID []byte) {
    // How do I track each goRoutine per session

}

func (s SessionManager) stopMonitorSession(sessionID []byte) {
    // How do I call the correct cancel function to stop the GoRoutine matching the session?
}

func (s SessionManager) runHeartbeat(ctx context.Context, sessionID []byte) context.CancelFunc {

    // Create a new context, with its cancellation function from the original context
    ctx, cancel := context.WithCancel(ctx)
    go func() {
        select {
        case <-ctx.Done():
            fmt.Println("Stopped monitoring session: ")
        default:
            err := s.sendPulseRequest(sessionID)
            // If this operation returns an error
            // cancel all operations using this local context created above
            if err != nil {
                cancel()
            }
            fmt.Println("done")
        }
    }()

    // return cancel function for call site to close at a later stage
    return cancel
}

func (s SessionManager) sendPulseRequest(sessionID []byte) error {
    mtd := "sendPulse: "

    req := requests.GetSessionPulseReq(sessionID)
    res, pulseErr := s.client.client.SessionPulse(s.client.ctx, req)
    if pulseErr != nil {
        dbgPrint(mtd, "Heartbeat error. Close session")
        return pulseErr
    }
    if res.Alive == false {
        dbgPrint(mtd, "Server not alive anymore. Close session")
        closeErr := s.CloseSession(sessionID)
        if closeErr != nil {
            return closeErr
        }
    }

    // no error
    return nil
}

更新:

感谢评论,我通过将会话和 CancelFunc 包装在一个名为 TypeDBSession 的专用结构中设法解决了大部分问题。

这样,停止函数只是从结构中拉取 CancelFunc,调用它,然后停止监视 GoRoutine。通过一些更多的调整,测试似乎通过了,尽管这暂时不是并发安全的。

话虽这么说,但这是一个需要解决的重要问题。再次感谢评论!

如果有人愿意提出一些代码改进建议,尤其是 w.r.t 以使这种并发安全,请随时在此处发表评论或填写 GH 问题/PR。

会话类型:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_type.go

会话监控:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_monitor.go

测试:

https://github.com/marvin-hansen/typedb-client-go/tree/main/test/client/session

我的两分钱:

  1. 您可能需要 运行 重复的心跳。在 select
  2. 周围使用带有 time.Ticker 的 for
  3. 存储地图会话ID —> func() 以跟踪所有可取消的上下文。也许你应该将 id 转换为 string