GRPC:保留对流的引用以将数据发送到多个客户端
GRPC: keep a reference to stream to send data to multiple clients
我正在使用 go 开始使用 GRPC。我阅读了官方文档和一些示例。
在大多数示例中,您不识别客户端,而是使用流来读取/写入数据。
我看到上下文中有 API 来检索身份验证信息,并且可以为 ChatRequest 识别客户端。
但是,如果我想根据客户端 ID 保留对 Stream 的引用/索引怎么办?
例如,
假设我在一个聊天室中有 3 个用户。我将 rpc 表示为(它也可以是服务器流)
rpc Chat(stream ChatRequest) returns (stream ChatResponse) {}
比如说,一个用户向群组发送了一条消息,该群组需要发送给其他 2 个。所以,如果我需要通过
当前为这些用户打开的流,保留流的引用有多安全。
实施会像...
type chatServiceServer struct {
// keep a map of subscribers / users currently connected; protect with mutex
}
func (s *chatServiceServer) Chat(stream pb.ChatService_ChatServer) error {
// md, ok := metadata.FromIncomingContext(stream.Context())
// p, ok := peer.FromContext(ctx)
// ... identify client from above
for {
// save the message to DB
// find other users in the chatroom is currently connected
// if so, stream.Send(m)
// else notify ....
}
}
但是,我在 API 文档中看到了警告,想知道更好的方法。
https://godoc.org/google.golang.org/grpc#ServerStream
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
SendMsg(m interface{}) error
任何订阅(事件,....)都会出现类似的用例,并且需要根据客户端 ID 进行通知。任何示例代码,文章也很棒。
谢谢
保存流以供其他地方使用应该是安全的,但您只能在单个 goroutine 中调用 SendMsg
(同样的限制也适用于 RecvMsg
,独立)。所以这意味着如果你在你的方法处理程序中这样做:
for {
if err := stream.Recv(req); err != nil {
return err
}
for _, s := range allStreams[req.ID] {
if err := s.Send(req.Message); err != nil { /* remove s from allStreams */ }
}
}
然后对 s.Send
的调用必须 由锁保护,因为这些处理程序中的多个可能 运行 同时。 (另外 allStreams
被假定为 map[ID][]stream
,在这种情况下它也必须被锁保护。)
我正在使用 go 开始使用 GRPC。我阅读了官方文档和一些示例。
在大多数示例中,您不识别客户端,而是使用流来读取/写入数据。 我看到上下文中有 API 来检索身份验证信息,并且可以为 ChatRequest 识别客户端。 但是,如果我想根据客户端 ID 保留对 Stream 的引用/索引怎么办?
例如,
假设我在一个聊天室中有 3 个用户。我将 rpc 表示为(它也可以是服务器流)
rpc Chat(stream ChatRequest) returns (stream ChatResponse) {}
比如说,一个用户向群组发送了一条消息,该群组需要发送给其他 2 个。所以,如果我需要通过 当前为这些用户打开的流,保留流的引用有多安全。
实施会像...
type chatServiceServer struct {
// keep a map of subscribers / users currently connected; protect with mutex
}
func (s *chatServiceServer) Chat(stream pb.ChatService_ChatServer) error {
// md, ok := metadata.FromIncomingContext(stream.Context())
// p, ok := peer.FromContext(ctx)
// ... identify client from above
for {
// save the message to DB
// find other users in the chatroom is currently connected
// if so, stream.Send(m)
// else notify ....
}
}
但是,我在 API 文档中看到了警告,想知道更好的方法。
https://godoc.org/google.golang.org/grpc#ServerStream
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
SendMsg(m interface{}) error
任何订阅(事件,....)都会出现类似的用例,并且需要根据客户端 ID 进行通知。任何示例代码,文章也很棒。
谢谢
保存流以供其他地方使用应该是安全的,但您只能在单个 goroutine 中调用 SendMsg
(同样的限制也适用于 RecvMsg
,独立)。所以这意味着如果你在你的方法处理程序中这样做:
for {
if err := stream.Recv(req); err != nil {
return err
}
for _, s := range allStreams[req.ID] {
if err := s.Send(req.Message); err != nil { /* remove s from allStreams */ }
}
}
然后对 s.Send
的调用必须 由锁保护,因为这些处理程序中的多个可能 运行 同时。 (另外 allStreams
被假定为 map[ID][]stream
,在这种情况下它也必须被锁保护。)