如何使用 gracefulStop 关闭所有 grpc 服务器流?
How to close all grpc server streams using gracefulStop?
我正在尝试停止从服务器端连接到流服务器的所有客户端。
实际上我正在使用 GracefulStop
方法来优雅地处理它。
我正在等待通道上的 os.Interrupt
信号以对 gRPC 执行正常停止。但是当客户端连接时它卡在 server.GracefulStop()
。
func (s *Service) Subscribe(_ *empty.Empty, srv clientapi.ClientApi_SubscribeServer) error {
ctx := srv.Context()
updateCh := make(chan *clientapi.Update, 100)
stopCh := make(chan bool)
defer func() {
stopCh<-true
close(updateCh)
}
go func() {
ticker := time.NewTicker(1 * time.Second)
defer func() {
ticker.Stop()
close(stopCh)
}
for {
select {
case <-stopCh:
return
case <-ticker.C:
updateCh<- &clientapi.Update{Name: "notification": Payload: "sample notification every 1 second"}
}
}
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case notif := <-updateCh:
err := srv.Send(notif)
if err == io.EOF {
return nil
}
if err != nil {
s.logger.Named("Subscribe").Error("error", zap.Error(err))
continue
}
}
}
}
我预计方法 ctx.Done()
中的 context
可以处理它并中断 for 循环。
如何关闭所有像这样的响应流?
为您的 gRPC 服务创建一个 global context
。所以遍历各个部分:
- 每个 gRPC 服务请求都将使用此上下文(连同客户端上下文)来完成该请求
os.Interrupt
处理程序将取消全局上下文;从而取消任何当前 运行 请求
- 最终发布
server.GracefulStop()
- 应该等待所有活动的 gRPC 调用完成(如果他们没有立即看到取消)
因此,例如,在设置 gRPC 服务时:
pctx := context.Background()
globalCtx, globalCancel := context.WithCancel(pctx)
mysrv := MyService{
gctx: globalCtx
}
s := grpc.NewServer()
pb.RegisterMyService(s, mysrv)
os.Interrupt
处理程序启动并等待关闭:
globalCancel()
server.GracefulStop()
gRPC 方法:
func(s *MyService) SomeRpcMethod(ctx context.Context, req *pb.Request) error {
// merge client and server contexts into one `mctx`
// (client context will cancel if client disconnects)
// (server context will cancel if service Ctrl-C'ed)
mctx, mcancel := mergeContext(ctx, s.gctx)
defer mcancel() // so we don't leak, if neither client or server context cancels
// RPC WORK GOES HERE
// RPC WORK GOES HERE
// RPC WORK GOES HERE
// pass mctx to any blocking calls:
// - http REST calls
// - SQL queries etc.
// - or if running a long loop; status check the context occasionally like so:
// Example long request (10s)
for i:=0; i<10*1000; i++ {
time.Sleep(1*time.Milliscond)
// poll merged context
select {
case <-mctx.Done():
return fmt.Errorf("request canceled: %s", mctx.Err())
default:
}
}
}
并且:
func mergeContext(a, b context.Context) (context.Context, context.CancelFunc) {
mctx, mcancel := context.WithCancel(a) // will cancel if `a` cancels
go func() {
select {
case <-mctx.Done(): // don't leak go-routine on clean gRPC run
case <-b.Done():
mcancel() // b canceled, so cancel mctx
}
}()
return mctx, mcancel
}
通常客户端需要假定 RPC 可以随时终止(例如由于连接错误或服务器电源故障)。所以我们所做的是 GracefulStop
,休眠一小段时间,让运行中的 RPC 有机会自然完成,然后硬 Stop
服务器。如果您确实需要使用此终止信号来结束您的 RPC,那么@colminator 的答案可能是最佳选择。但这种情况应该不常见,如果您确实发现有必要在服务器关闭时手动结束流式 RPC,您可能需要花一些时间分析您的设计。
我正在尝试停止从服务器端连接到流服务器的所有客户端。
实际上我正在使用 GracefulStop
方法来优雅地处理它。
我正在等待通道上的 os.Interrupt
信号以对 gRPC 执行正常停止。但是当客户端连接时它卡在 server.GracefulStop()
。
func (s *Service) Subscribe(_ *empty.Empty, srv clientapi.ClientApi_SubscribeServer) error {
ctx := srv.Context()
updateCh := make(chan *clientapi.Update, 100)
stopCh := make(chan bool)
defer func() {
stopCh<-true
close(updateCh)
}
go func() {
ticker := time.NewTicker(1 * time.Second)
defer func() {
ticker.Stop()
close(stopCh)
}
for {
select {
case <-stopCh:
return
case <-ticker.C:
updateCh<- &clientapi.Update{Name: "notification": Payload: "sample notification every 1 second"}
}
}
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case notif := <-updateCh:
err := srv.Send(notif)
if err == io.EOF {
return nil
}
if err != nil {
s.logger.Named("Subscribe").Error("error", zap.Error(err))
continue
}
}
}
}
我预计方法 ctx.Done()
中的 context
可以处理它并中断 for 循环。
如何关闭所有像这样的响应流?
为您的 gRPC 服务创建一个 global context
。所以遍历各个部分:
- 每个 gRPC 服务请求都将使用此上下文(连同客户端上下文)来完成该请求
os.Interrupt
处理程序将取消全局上下文;从而取消任何当前 运行 请求- 最终发布
server.GracefulStop()
- 应该等待所有活动的 gRPC 调用完成(如果他们没有立即看到取消)
因此,例如,在设置 gRPC 服务时:
pctx := context.Background()
globalCtx, globalCancel := context.WithCancel(pctx)
mysrv := MyService{
gctx: globalCtx
}
s := grpc.NewServer()
pb.RegisterMyService(s, mysrv)
os.Interrupt
处理程序启动并等待关闭:
globalCancel()
server.GracefulStop()
gRPC 方法:
func(s *MyService) SomeRpcMethod(ctx context.Context, req *pb.Request) error {
// merge client and server contexts into one `mctx`
// (client context will cancel if client disconnects)
// (server context will cancel if service Ctrl-C'ed)
mctx, mcancel := mergeContext(ctx, s.gctx)
defer mcancel() // so we don't leak, if neither client or server context cancels
// RPC WORK GOES HERE
// RPC WORK GOES HERE
// RPC WORK GOES HERE
// pass mctx to any blocking calls:
// - http REST calls
// - SQL queries etc.
// - or if running a long loop; status check the context occasionally like so:
// Example long request (10s)
for i:=0; i<10*1000; i++ {
time.Sleep(1*time.Milliscond)
// poll merged context
select {
case <-mctx.Done():
return fmt.Errorf("request canceled: %s", mctx.Err())
default:
}
}
}
并且:
func mergeContext(a, b context.Context) (context.Context, context.CancelFunc) {
mctx, mcancel := context.WithCancel(a) // will cancel if `a` cancels
go func() {
select {
case <-mctx.Done(): // don't leak go-routine on clean gRPC run
case <-b.Done():
mcancel() // b canceled, so cancel mctx
}
}()
return mctx, mcancel
}
通常客户端需要假定 RPC 可以随时终止(例如由于连接错误或服务器电源故障)。所以我们所做的是 GracefulStop
,休眠一小段时间,让运行中的 RPC 有机会自然完成,然后硬 Stop
服务器。如果您确实需要使用此终止信号来结束您的 RPC,那么@colminator 的答案可能是最佳选择。但这种情况应该不常见,如果您确实发现有必要在服务器关闭时手动结束流式 RPC,您可能需要花一些时间分析您的设计。