如何从阻塞的 Recv() 调用中解除 go gRPC bidi-streaming 服务器?
How to un-wedge go gRPC bidi-streaming server from the blocking Recv() call?
在 golang 的 gRPC 中提供双向流时,规范流处理程序如下所示:
func (s *MyServer) MyBidiRPC(stream somepb.MyServer_MyBidiServer) error {
for {
data, err := stream.Recv()
if err == io.EOF {
return nil // clean close
}
if err != nil {
return err // some other error
}
// do things with data here
}
}
具体来说,当 bidi RPC 的处理程序 returns 时,这是考虑服务器端关闭的信号。
这是一个同步编程模型——在等待来自客户端的消息时,服务器在这个 goroutine(由 grpc 库创建)中保持阻塞状态。
现在,我想取消阻止此 Recv() 调用(最终在底层 grpc.ServerStream 上调用 RecvMsg())和 return/close 流,因为服务器进程已决定与此客户完成。
不幸的是,我找不到明显的方法来做到这一点:
- 在为我的服务生成的 bidi 服务器接口上没有类似 Close() 或 CloseSend() 或 CloseRecv() 或 Shutdown() 的函数
- 我可以使用 stream.Context() 获取的流中的上下文不会公开用户可访问的取消函数
- 我找不到一种方法来为 grpc.Server 接受的新连接在“起始端”传递上下文,我可以在其中注入自己的取消函数
我可以通过调用 Stop() 来关闭整个 grpc.Server,但这不是我想要做的 -- 只有这个特定的客户端连接 (grpc.ServerStream) 应该完成。
我可以向客户端发送一条消息,让客户端依次关闭连接。但是,如果客户端掉线了,这将不起作用,这将通过超时来解决,超时必须很长才能普遍健壮。我现在 想要它,因为我没有耐心,而且更重要的是,在规模上,悬而未决的无响应客户端可能会付出高昂的代价。
我可以(也许)通过反射挖掘 grpc.ServerStream 直到找到 transportStream,然后从中挖掘出取消函数并调用它。或者通过反射挖掘 stream.Context() ,并制作我自己的取消函数引用来调用。对于未来的维护者来说,这些似乎都不是很好的建议。
但这些肯定不是唯一的选择吧?决定某个特定客户端不再需要连接并不是魔术 space-外星科学。我如何关闭此流,以便 Recv() 调用从服务器进程端解除阻塞,而不涉及到客户端的往返?
不幸的是,我认为没有很好的方法可以满足您的要求。根据您的目标,我认为您有两种选择:
运行 在 goroutine 中接收,并在需要时从 bidi 处理程序中 return return。这将关闭上下文并取消阻止 Recv。这显然是次优的,因为它需要小心,因为您现在有代码在处理程序的执行范围之外执行。然而,这是我能找到的最接近的答案。
如果您试图通过设置超时来减轻客户端行为不当的影响,您可以使用 KeepaliveEnforcementPolicy and/or KeepaliveParams 将此工作卸载到框架。如果这符合您希望关闭连接的原因,这可能是更可取的,但除此之外没有多大用处。
在 golang 的 gRPC 中提供双向流时,规范流处理程序如下所示:
func (s *MyServer) MyBidiRPC(stream somepb.MyServer_MyBidiServer) error {
for {
data, err := stream.Recv()
if err == io.EOF {
return nil // clean close
}
if err != nil {
return err // some other error
}
// do things with data here
}
}
具体来说,当 bidi RPC 的处理程序 returns 时,这是考虑服务器端关闭的信号。
这是一个同步编程模型——在等待来自客户端的消息时,服务器在这个 goroutine(由 grpc 库创建)中保持阻塞状态。
现在,我想取消阻止此 Recv() 调用(最终在底层 grpc.ServerStream 上调用 RecvMsg())和 return/close 流,因为服务器进程已决定与此客户完成。
不幸的是,我找不到明显的方法来做到这一点:
- 在为我的服务生成的 bidi 服务器接口上没有类似 Close() 或 CloseSend() 或 CloseRecv() 或 Shutdown() 的函数
- 我可以使用 stream.Context() 获取的流中的上下文不会公开用户可访问的取消函数
- 我找不到一种方法来为 grpc.Server 接受的新连接在“起始端”传递上下文,我可以在其中注入自己的取消函数
我可以通过调用 Stop() 来关闭整个 grpc.Server,但这不是我想要做的 -- 只有这个特定的客户端连接 (grpc.ServerStream) 应该完成。
我可以向客户端发送一条消息,让客户端依次关闭连接。但是,如果客户端掉线了,这将不起作用,这将通过超时来解决,超时必须很长才能普遍健壮。我现在 想要它,因为我没有耐心,而且更重要的是,在规模上,悬而未决的无响应客户端可能会付出高昂的代价。
我可以(也许)通过反射挖掘 grpc.ServerStream 直到找到 transportStream,然后从中挖掘出取消函数并调用它。或者通过反射挖掘 stream.Context() ,并制作我自己的取消函数引用来调用。对于未来的维护者来说,这些似乎都不是很好的建议。
但这些肯定不是唯一的选择吧?决定某个特定客户端不再需要连接并不是魔术 space-外星科学。我如何关闭此流,以便 Recv() 调用从服务器进程端解除阻塞,而不涉及到客户端的往返?
不幸的是,我认为没有很好的方法可以满足您的要求。根据您的目标,我认为您有两种选择:
运行 在 goroutine 中接收,并在需要时从 bidi 处理程序中 return return。这将关闭上下文并取消阻止 Recv。这显然是次优的,因为它需要小心,因为您现在有代码在处理程序的执行范围之外执行。然而,这是我能找到的最接近的答案。
如果您试图通过设置超时来减轻客户端行为不当的影响,您可以使用 KeepaliveEnforcementPolicy and/or KeepaliveParams 将此工作卸载到框架。如果这符合您希望关闭连接的原因,这可能是更可取的,但除此之外没有多大用处。