从 gRPC 服务器检测客户端上下文破坏

Detect client context destruction from gRPC server

我创建了一个 Async C++ gRPC 服务器,它提供了几个类似的 API,其签名类似于:

service Foo {
    rpc FunctionalityA(ARequest) returns (stream AResponse);
    rpc FunctionalityB(BRequest) returns (stream BResponse);

客户端创建一个连接到此服务的通道,并使用从不同线程调用各种 RPC,如下所示:

class FooClient {

// ...

    void FunctionalityA() {
        auto stub = example::Foo::NewStub(m_channel);
        grpc::ClientContext context;
        example::ARequest request;
        example::AResponse response;
        auto reader = stub->FunctionalityA(&context, request);
        for(int i = 0; i < 3; i++) {

    void FunctionalityB() {
        auto stub = example::Foo::NewStub(m_channel);
        grpc::ClientContext context;
        example::BRequest request;
        example::BResponse response;
        auto reader = stub->FunctionalityB(&context, request);
        for(int i = 0; i < 3; i++) {

// ...


int main() {
    // ...
    FooClient client(grpc::CreateChannel("", grpc::InsecureChannelCredentials()));
    auto ta = std::thread(&FooClient::FunctionalityA, &client);
    auto tb = std::thread(&FooClient::FunctionalityB, &client);
    // ...


我面临的问题是,即使与两个功能之一关联的 ClientContext 超出范围(在示例中读取 3 次之后),服务器也不会收到任何信息并继续写入,并且“ok”状态保持为真。 “ok”状态变为 false 并允许我仅在客户端断开连接时停止写入。

这是 gRPC 的预期行为吗?客户端是否需要发送特定的“死亡之吻”消息以通知服务器停止写入流?


void FunctionalityB::ProcessRequest(bool ok, RequestState state) {
    if(!ok) {
        if(state == RequestState::START) {
            // the server has been Shutdown before this particular call got matched to an incoming RPC
            delete this;
        } else if(state == RequestState::WRITE || state == RequestState::FINISH) {
            // not going to the wire because the call is already dead (i.e., canceled, deadline expired, other side dropped the channel, etc).
            delete this;
        } else {
            // unhandled state
    } else {
        if(state == RequestState::START) {
            // the RPC has indeed been started
            m_writer.Write(m_response, CreateTag(RequestState::WRITE));
            // the constructor of the functionality requests a new one to handle future new connections
            new FunctionalityB(m_completion_queue, m_service, m_worker);
        } else if(state == RequestState::WRITE) {
            // TODO do some real work
            m_writer.Write(m_response, CreateTag(RequestState::WRITE)); // this write will continue forever, even after client stops reading and TryCancel its context
        } else if(state == RequestState::FINISH) {
            delete this;
        } else {
            // unhandled state



