从 gRPC 服务器检测客户端上下文破坏

Detect client context destruction from gRPC server

我创建了一个 Async C++ gRPC 服务器,它提供了几个类似的 API,其签名类似于:

service Foo {
    rpc FunctionalityA(ARequest) returns (stream AResponse);
    rpc FunctionalityB(BRequest) returns (stream BResponse);
}

客户端创建一个连接到此服务的通道,并使用从不同线程调用各种 RPC,如下所示:

class FooClient {

// ...

    void FunctionalityA() {
        auto stub = example::Foo::NewStub(m_channel);
        grpc::ClientContext context;
        example::ARequest request;
        example::AResponse response;
        auto reader = stub->FunctionalityA(&context, request);
        for(int i = 0; i < 3; i++) {
            reader->Read(&response);
        }
    }

    void FunctionalityB() {
        auto stub = example::Foo::NewStub(m_channel);
        grpc::ClientContext context;
        example::BRequest request;
        example::BResponse response;
        auto reader = stub->FunctionalityB(&context, request);
        for(int i = 0; i < 3; i++) {
            reader->Read(&response);
        }
    }

// ...

};

int main() {
    // ...
    FooClient client(grpc::CreateChannel("127.0.0.1:12345", grpc::InsecureChannelCredentials()));
    auto ta = std::thread(&FooClient::FunctionalityA, &client);
    auto tb = std::thread(&FooClient::FunctionalityB, &client);
    // ...
}

我想实现服务器以便:

我面临的问题是,即使与两个功能之一关联的 ClientContext 超出范围(在示例中读取 3 次之后),服务器也不会收到任何信息并继续写入,并且“ok”状态保持为真。 “ok”状态变为 false 并允许我仅在客户端断开连接时停止写入。

这是 gRPC 的预期行为吗?客户端是否需要发送特定的“死亡之吻”消息以通知服务器停止写入流?

为了完整起见,这里是功能服务器端的实现示例:

void FunctionalityB::ProcessRequest(bool ok, RequestState state) {
    if(!ok) {
        if(state == RequestState::START) {
            // the server has been Shutdown before this particular call got matched to an incoming RPC
            delete this;
        } else if(state == RequestState::WRITE || state == RequestState::FINISH) {
            // not going to the wire because the call is already dead (i.e., canceled, deadline expired, other side dropped the channel, etc).
            delete this;
        } else {
            // unhandled state
        }
    } else {
        if(state == RequestState::START) {
            // the RPC has indeed been started
            m_writer.Write(m_response, CreateTag(RequestState::WRITE));
            // the constructor of the functionality requests a new one to handle future new connections
            new FunctionalityB(m_completion_queue, m_service, m_worker);
        } else if(state == RequestState::WRITE) {
            // TODO do some real work
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
            m_writer.Write(m_response, CreateTag(RequestState::WRITE)); // this write will continue forever, even after client stops reading and TryCancel its context
        } else if(state == RequestState::FINISH) {
            delete this;
        } else {
            // unhandled state
        }
    }
}

有两种方法可以检测服务器上的呼叫取消。

第一个是检查ServerContext::IsCancelled()。这是您可以在写入之前检查的内容,在这种情况下可能没问题。但是,在一般情况下,它可能并不理想,因为您的应用程序在执行另一次写入之前可能正在等待其他事件(除了先前的写入完成),并且理想情况下您希望在取消时获得通知的异步方式发生了。

这让我想到了第二种方法,即当通过调用 ServerContext::AsyncNotifyWhenDone() before the RPC starts. This will give you async notification of the cancellation, but unfortunately, the API is very cumbersome and has a few sharp edges. (This is something that is handled much more cleanly in the new callback-based API, but that API isn't that performant in OSS until we finish the EventEngine 努力取消调用时,在完成队列中请求一个事件。)

希望此信息对您有所帮助。