从 gRPC 服务器检测客户端上下文破坏
Detect client context destruction from gRPC server
我创建了一个 Async C++ gRPC 服务器,它提供了几个类似的 API,其签名类似于:
service Foo {
rpc FunctionalityA(ARequest) returns (stream AResponse);
rpc FunctionalityB(BRequest) returns (stream BResponse);
}
客户端创建一个连接到此服务的通道,并使用从不同线程调用各种 RPC,如下所示:
class FooClient {
// ...
void FunctionalityA() {
auto stub = example::Foo::NewStub(m_channel);
grpc::ClientContext context;
example::ARequest request;
example::AResponse response;
auto reader = stub->FunctionalityA(&context, request);
for(int i = 0; i < 3; i++) {
reader->Read(&response);
}
}
void FunctionalityB() {
auto stub = example::Foo::NewStub(m_channel);
grpc::ClientContext context;
example::BRequest request;
example::BResponse response;
auto reader = stub->FunctionalityB(&context, request);
for(int i = 0; i < 3; i++) {
reader->Read(&response);
}
}
// ...
};
int main() {
// ...
FooClient client(grpc::CreateChannel("127.0.0.1:12345", grpc::InsecureChannelCredentials()));
auto ta = std::thread(&FooClient::FunctionalityA, &client);
auto tb = std::thread(&FooClient::FunctionalityB, &client);
// ...
}
我想实现服务器以便:
- 调用 FunctionalityA 时,它开始流式传输 AResponse 类型的对象
- 调用 FunctionalityB 时,它开始流式传输 BResponse 类型的对象
- 当用于调用 FunctionalityA 的上下文被取消时,AResponse 的流式传输结束
- 当用于调用 FunctionalityB 的上下文被取消时,BResponse 的流式传输结束
我面临的问题是,即使与两个功能之一关联的 ClientContext 超出范围(在示例中读取 3 次之后),服务器也不会收到任何信息并继续写入,并且“ok”状态保持为真。
“ok”状态变为 false 并允许我仅在客户端断开连接时停止写入。
这是 gRPC 的预期行为吗?客户端是否需要发送特定的“死亡之吻”消息以通知服务器停止写入流?
为了完整起见,这里是功能服务器端的实现示例:
void FunctionalityB::ProcessRequest(bool ok, RequestState state) {
if(!ok) {
if(state == RequestState::START) {
// the server has been Shutdown before this particular call got matched to an incoming RPC
delete this;
} else if(state == RequestState::WRITE || state == RequestState::FINISH) {
// not going to the wire because the call is already dead (i.e., canceled, deadline expired, other side dropped the channel, etc).
delete this;
} else {
// unhandled state
}
} else {
if(state == RequestState::START) {
// the RPC has indeed been started
m_writer.Write(m_response, CreateTag(RequestState::WRITE));
// the constructor of the functionality requests a new one to handle future new connections
new FunctionalityB(m_completion_queue, m_service, m_worker);
} else if(state == RequestState::WRITE) {
// TODO do some real work
std::this_thread::sleep_for(std::chrono::milliseconds(50));
m_writer.Write(m_response, CreateTag(RequestState::WRITE)); // this write will continue forever, even after client stops reading and TryCancel its context
} else if(state == RequestState::FINISH) {
delete this;
} else {
// unhandled state
}
}
}
有两种方法可以检测服务器上的呼叫取消。
第一个是检查ServerContext::IsCancelled()
。这是您可以在写入之前检查的内容,在这种情况下可能没问题。但是,在一般情况下,它可能并不理想,因为您的应用程序在执行另一次写入之前可能正在等待其他事件(除了先前的写入完成),并且理想情况下您希望在取消时获得通知的异步方式发生了。
这让我想到了第二种方法,即当通过调用 ServerContext::AsyncNotifyWhenDone()
before the RPC starts. This will give you async notification of the cancellation, but unfortunately, the API is very cumbersome and has a few sharp edges. (This is something that is handled much more cleanly in the new callback-based API, but that API isn't that performant in OSS until we finish the EventEngine 努力取消调用时,在完成队列中请求一个事件。)
希望此信息对您有所帮助。
我创建了一个 Async C++ gRPC 服务器,它提供了几个类似的 API,其签名类似于:
service Foo {
rpc FunctionalityA(ARequest) returns (stream AResponse);
rpc FunctionalityB(BRequest) returns (stream BResponse);
}
客户端创建一个连接到此服务的通道,并使用从不同线程调用各种 RPC,如下所示:
class FooClient {
// ...
void FunctionalityA() {
auto stub = example::Foo::NewStub(m_channel);
grpc::ClientContext context;
example::ARequest request;
example::AResponse response;
auto reader = stub->FunctionalityA(&context, request);
for(int i = 0; i < 3; i++) {
reader->Read(&response);
}
}
void FunctionalityB() {
auto stub = example::Foo::NewStub(m_channel);
grpc::ClientContext context;
example::BRequest request;
example::BResponse response;
auto reader = stub->FunctionalityB(&context, request);
for(int i = 0; i < 3; i++) {
reader->Read(&response);
}
}
// ...
};
int main() {
// ...
FooClient client(grpc::CreateChannel("127.0.0.1:12345", grpc::InsecureChannelCredentials()));
auto ta = std::thread(&FooClient::FunctionalityA, &client);
auto tb = std::thread(&FooClient::FunctionalityB, &client);
// ...
}
我想实现服务器以便:
- 调用 FunctionalityA 时,它开始流式传输 AResponse 类型的对象
- 调用 FunctionalityB 时,它开始流式传输 BResponse 类型的对象
- 当用于调用 FunctionalityA 的上下文被取消时,AResponse 的流式传输结束
- 当用于调用 FunctionalityB 的上下文被取消时,BResponse 的流式传输结束
我面临的问题是,即使与两个功能之一关联的 ClientContext 超出范围(在示例中读取 3 次之后),服务器也不会收到任何信息并继续写入,并且“ok”状态保持为真。 “ok”状态变为 false 并允许我仅在客户端断开连接时停止写入。
这是 gRPC 的预期行为吗?客户端是否需要发送特定的“死亡之吻”消息以通知服务器停止写入流?
为了完整起见,这里是功能服务器端的实现示例:
void FunctionalityB::ProcessRequest(bool ok, RequestState state) {
if(!ok) {
if(state == RequestState::START) {
// the server has been Shutdown before this particular call got matched to an incoming RPC
delete this;
} else if(state == RequestState::WRITE || state == RequestState::FINISH) {
// not going to the wire because the call is already dead (i.e., canceled, deadline expired, other side dropped the channel, etc).
delete this;
} else {
// unhandled state
}
} else {
if(state == RequestState::START) {
// the RPC has indeed been started
m_writer.Write(m_response, CreateTag(RequestState::WRITE));
// the constructor of the functionality requests a new one to handle future new connections
new FunctionalityB(m_completion_queue, m_service, m_worker);
} else if(state == RequestState::WRITE) {
// TODO do some real work
std::this_thread::sleep_for(std::chrono::milliseconds(50));
m_writer.Write(m_response, CreateTag(RequestState::WRITE)); // this write will continue forever, even after client stops reading and TryCancel its context
} else if(state == RequestState::FINISH) {
delete this;
} else {
// unhandled state
}
}
}
有两种方法可以检测服务器上的呼叫取消。
第一个是检查ServerContext::IsCancelled()
。这是您可以在写入之前检查的内容,在这种情况下可能没问题。但是,在一般情况下,它可能并不理想,因为您的应用程序在执行另一次写入之前可能正在等待其他事件(除了先前的写入完成),并且理想情况下您希望在取消时获得通知的异步方式发生了。
这让我想到了第二种方法,即当通过调用 ServerContext::AsyncNotifyWhenDone()
before the RPC starts. This will give you async notification of the cancellation, but unfortunately, the API is very cumbersome and has a few sharp edges. (This is something that is handled much more cleanly in the new callback-based API, but that API isn't that performant in OSS until we finish the EventEngine 努力取消调用时,在完成队列中请求一个事件。)
希望此信息对您有所帮助。