GRPC:保留对流的引用以将数据发送到多个客户端

GRPC: keep a reference to stream to send data to multiple clients

我正在使用 go 开始使用 GRPC。我阅读了官方文档和一些示例。

在大多数示例中,您不识别客户端,而是使用流来读取/写入数据。 我看到上下文中有 API 来检索身份验证信息,并且可以为 ChatRequest 识别客户端。 但是,如果我想根据客户端 ID 保留对 Stream 的引用/索引怎么办?

例如,

假设我在一个聊天室中有 3 个用户。我将 rpc 表示为(它也可以是服务器流)

rpc Chat(stream ChatRequest) returns (stream ChatResponse) {}

比如说,一个用户向群组发送了一条消息,该群组需要发送给其他 2 个。所以,如果我需要通过 当前为这些用户打开的流,保留流的引用有多安全。

实施会像...

type chatServiceServer struct {
    // keep a map of subscribers / users currently connected; protect with mutex
}

func (s *chatServiceServer) Chat(stream pb.ChatService_ChatServer) error {
    // md, ok := metadata.FromIncomingContext(stream.Context())
    // p, ok := peer.FromContext(ctx)
    // ... identify client from above

    for {
        // save the message to DB
        // find other users in the chatroom is currently connected
        // if so, stream.Send(m)
        // else notify ....
    }
}

但是,我在 API 文档中看到了警告,想知道更好的方法。

https://godoc.org/google.golang.org/grpc#ServerStream
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
SendMsg(m interface{}) error

任何订阅(事件,....)都会出现类似的用例,并且需要根据客户端 ID 进行通知。任何示例代码,文章也很棒。

谢谢

保存流以供其他地方使用应该是安全的,但您只能在单个 goroutine 中调用 SendMsg(同样的限制也适用于 RecvMsg,独立)。所以这意味着如果你在你的方法处理程序中这样做:

for {
  if err := stream.Recv(req); err != nil {
    return err
  }
  for _, s := range allStreams[req.ID] {
    if err := s.Send(req.Message); err != nil { /* remove s from allStreams */ }
  }
}

然后对 s.Send 的调用必须 由锁保护,因为这些处理程序中的多个可能 运行 同时。 (另外 allStreams 被假定为 map[ID][]stream,在这种情况下它也必须被锁保护。)