如何将 grpc c++ ClientAsyncReader<Message> 用于服务器端流

How to use grpc c++ ClientAsyncReader<Message> for server side streams

我正在使用一个非常简单的原型,其中消息仅包含 1 个字符串字段。像这样:

service LongLivedConnection {  
  // Starts a grpc connection
  rpc Connect(Connection) returns (stream Message) {}
}

message Connection{
  string userId = 1;
}

message Message{
  string serverMessage = 1;
}

用例是客户端应该连接到服务器,服务器将使用这个grpc来推送消息。

现在,对于客户端代码,假设我已经在一个工作线程中,我该如何正确设置它,以便我可以在随机时间连续接收来自服务器的消息?

void StartConnection(const std::string& user) {
  Connection request;
  request.set_userId(user);

  Message message;
  ClientContext context;

  stub_->Connect(&context, request, &reply);

  // What should I do from now on? 

  // notify(serverMessage);
}

void notify(std::string message) {
  // generate message events and pass to main event loop
}

我知道如何使用 api。看起来它非常灵活,但仍然有点奇怪,因为我通常只希望异步 api 接收某种 lambda 回调。

下面的代码是阻塞的,您必须 运行 在不同的线程中这样做,这样它就不会阻塞您的应用程序。

我相信你可以让多个线程访问 CompletionQueue,但在我的例子中,我只有一个线程处理这个 grpc 连接。

GrpcConnection.h file:
public:
void StartGrpcConnection();

private:
std::shared_ptr<grpc::Channel> m_channel;
std::unique_ptr<grpc::ClientReader<push_notifications::Message>> m_reader;
std::unique_ptr<push_notifications::PushNotificationService::Stub> m_stub;
GrpcConnection.cpp files:
...
void GrpcConnectionService::StartGrpcConnection()
{
    m_channel = grpc::CreateChannel("localhost:50051",grpc::InsecureChannelCredentials());
    LongLiveConnection::Connect request;
    request.set_user_id(12345);
    m_stub = LongLiveConnection::LongLiveConnectionService::NewStub(m_channel);
    
    grpc::ClientContext context;
    grpc::CompletionQueue cq;
    std::unique_ptr<grpc::ClientAsyncReader<LongLiveConnection::Message>> reader =
            m_stub->PrepareAsyncConnect(&context, request, &cq);

    void* got_tag;
    bool ok = false;
    LongLiveConnection::Message reply;

    reader->StartCall((void*)1);
    cq.Next(&got_tag, &ok);

    if (ok && got_tag == (void*)1)
    {
        // startCall() is successful if ok is true, and got_tag is void*1

        // start the first read message with a different hardcoded tag
        reader->Read(&reply, (void*)2);

        while (true)
        {
            ok = false;

            cq.Next(&got_tag, &ok);

            if (got_tag == (void*)2)
            {
                // this is the message from server
                std::string body = reply.server_message();
                // do whatever you want with body, in my case i push it to my applications' event stream to be processed by other components
                
                // lastly, initialize another read
                reader->Read(&reply, (void*)2);
            }
            else if (got_tag == (void*)3)
            {
                 // if you do something else, such as listening to GRPC channel state change, in your call, you can pass a different hardcoded tag, then, in here, you will be notified when the result is received from that call.
            }
        }
    }
}