gRPC 中的内存泄漏 async_client
Memory leak in gRPC async_client
我使用 gRPC
async client
的方式与 example 类似。
在这个例子中(发表于gRPC
官方github
)客户端为要发送的消息分配内存,使用tag
地址作为completion queue
,并且当消息在侦听器线程中被回复时,内存(由 tag
- 地址已知)是空闲的。
我担心服务器不响应消息和内存永远不会空闲的情况。
gRPC
能保护我免受这种情况的影响吗?
- 我应该以不同的方式实施吗? (使用智能 pointers/save 数据中的指针 structure/etc...)
异步客户端发送函数
void SayHello(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
// StartCall initiates the RPC call
call->response_reader->StartCall();
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
线程的异步客户端接收函数
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call object
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
// Once we're complete, deallocate the call object.
delete call;
}
}
主要
int main(int argc, char** argv) {
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
for (int i = 0; i < 100; i++) {
std::string user("world " + std::to_string(i));
greeter.SayHello(user); // The actual RPC call!
}
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); //blocks forever
return 0;
}
Does the gRPC protect me from this situation?
有点。 gRPC 保证所有排队的操作迟早会在它们匹配的完成队列中结束。所以你的代码是可以的,只要:
- 在不幸的时候没有抛出异常。
- 您不更改创建不包括排队操作或删除调用的代码路径的代码。
换句话说:没问题,但很脆弱。
选项A:
想要真正的健壮,方法是std::shared_ptr<>
。但是,它们可能会以意想不到的方式影响多线程性能。因此,它是否值得取决于您的应用程序在性能与稳健性范围内的位置。
这样的重构看起来像:
AsyncClientCall
继承自 std::enable_shared_from_this
- 将
call
的构造改为std::make_shared<AsyncClientCall>()
- 在完成队列处理程序中,增加引用计数:
while (cq_.Next(&got_tag, &ok)) {
auto call = static_cast<AsyncClientCall*>(got_tag)->shared_from_this();
并去掉 delete
,很明显。
选项 B:
你也可以通过 unique_ptr<>
:
auto call = std::make_unique<AsyncClientCall>();
...
call->response_reader->Finish(&call->reply, &call->status, (void*)call.release());
和
std::unique_ptr<AsyncClientCall> call{static_cast<AsyncClientCall*>(got_tag)};
这可以在维护其他所有内容的同时防止重构和异常。但是,这仅适用于产生单个完成事件的一元 rpc。流式 rpc 或交换元数据的 rpc 将需要完全不同的处理方式。
我使用 gRPC
async client
的方式与 example 类似。
在这个例子中(发表于gRPC
官方github
)客户端为要发送的消息分配内存,使用tag
地址作为completion queue
,并且当消息在侦听器线程中被回复时,内存(由 tag
- 地址已知)是空闲的。
我担心服务器不响应消息和内存永远不会空闲的情况。
gRPC
能保护我免受这种情况的影响吗?- 我应该以不同的方式实施吗? (使用智能 pointers/save 数据中的指针 structure/etc...)
异步客户端发送函数
void SayHello(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
// StartCall initiates the RPC call
call->response_reader->StartCall();
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
线程的异步客户端接收函数
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call object
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
// Once we're complete, deallocate the call object.
delete call;
}
}
主要
int main(int argc, char** argv) {
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
for (int i = 0; i < 100; i++) {
std::string user("world " + std::to_string(i));
greeter.SayHello(user); // The actual RPC call!
}
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); //blocks forever
return 0;
}
Does the gRPC protect me from this situation?
有点。 gRPC 保证所有排队的操作迟早会在它们匹配的完成队列中结束。所以你的代码是可以的,只要:
- 在不幸的时候没有抛出异常。
- 您不更改创建不包括排队操作或删除调用的代码路径的代码。
换句话说:没问题,但很脆弱。
选项A:
想要真正的健壮,方法是std::shared_ptr<>
。但是,它们可能会以意想不到的方式影响多线程性能。因此,它是否值得取决于您的应用程序在性能与稳健性范围内的位置。
这样的重构看起来像:
AsyncClientCall
继承自std::enable_shared_from_this
- 将
call
的构造改为std::make_shared<AsyncClientCall>()
- 在完成队列处理程序中,增加引用计数:
while (cq_.Next(&got_tag, &ok)) {
auto call = static_cast<AsyncClientCall*>(got_tag)->shared_from_this();
并去掉 delete
,很明显。
选项 B:
你也可以通过 unique_ptr<>
:
auto call = std::make_unique<AsyncClientCall>();
...
call->response_reader->Finish(&call->reply, &call->status, (void*)call.release());
和
std::unique_ptr<AsyncClientCall> call{static_cast<AsyncClientCall*>(got_tag)};
这可以在维护其他所有内容的同时防止重构和异常。但是,这仅适用于产生单个完成事件的一元 rpc。流式 rpc 或交换元数据的 rpc 将需要完全不同的处理方式。