C++ GRPC ClientAsyncReaderWriter:如何检查数据是否可供读取?
C++ GRPC ClientAsyncReaderWriter: how to check if data is available for read?
我有使用 ClientAsyncReaderWriter 与服务器通信的双向流式异步 grpc 客户端。 RPC 代码如下所示:
rpc Process (stream Request) returns (stream Response)
为简单起见,请求和响应是字节数组 (byte[]
)。我向服务器发送几块数据,当服务器积累足够的数据时,服务器处理这些数据并发回响应并继续为下一个响应积累数据。几次响应后,服务器发送最终响应并关闭连接。
对于异步客户端,我使用 CompletionQueue
。代码如下:
...
CompletionQueue cq;
std::unique_ptr<Stub> stub;
grpc::ClientContext context;
std::unique_ptr<grpc::ClientAsyncReaderWriter<Request,Response>> responder = stub->AsyncProcess(&context, &cq, handler);
// thread for completition queue
std::thread t(
[]{
void *handler = nullptr;
bool ok = false;
while (cq_.Next(&handler, &ok)) {
if (can_read) {
// how do you know that it is read data available
// Do read
} else {
// do write
...
Request request = prepare_request();
responder_->Write(request, handler);
}
}
}
);
...
// wait
异步阅读的正确方法是什么?如果没有可用数据,我可以尝试阅读吗?是阻塞调用吗?
排序 Read() 调用
Can I try to read if it no data available?
是的,而且这种情况经常发生。 Read()
在数据可用之前什么都不做,然后才将其传递的标签放入完成队列。 (详情见下文)
Is it blocking call?
没有。 Read()
和 Write()
return 立即。但是,您在任何给定时刻只能让其中一架飞行。如果您尝试在前一个完成之前发送第二个,它(第二个)将失败。
What is the proper way to async reading?
每完成一个 Read()
,就开始一个新的。为此,您需要能够判断 Read()
何时完成。这就是标签的用武之地!
当您调用 Read(&msg, tag)
或 Write(request, tag)
时,您是在告诉 grpc 在该操作完成后将 tag
放入与该响应者关联的完成队列中。 grpc 不关心标签是什么,它只是传递它。
所以您要采用的一般策略是:
- 一旦您准备好开始接收消息:
- 调用
responder->Read()
一次,使用一些您将识别为“已读完成”的标签。
- 每当
cq_.Next()
还给你那个标签,并且 ok == true
:
- 消费消息
- 使用相同标签排队新的
responder->Read()
。
显然,您还需要对 Write()
.
的调用执行类似的操作
但是由于您仍然希望能够从给定标签中查找处理程序实例,因此您需要一种方法来打包对处理程序的引用以及有关在单个标签中完成哪个操作的信息.
完成队列
从给定的标签中查找处理程序实例?为什么?
完成队列真正的 存在理由 不幸的是,这些例子并不明显。它们允许多个异步 rpc 共享同一个线程。除非您的应用程序只进行一次 rpc 调用,否则处理线程不应与特定的响应者相关联。相反,该线程应该是一个通用工作线程,它根据标签的内容将事件分派到正确的处理程序。
官方示例倾向于使用指向处理程序对象的指针作为标记来做到这一点。当预期有特定的事件序列时,这会起作用,因为您可以轻松预测处理程序的反应。您通常不能使用异步双向流来做到这一点,因为任何给定的完成事件都可能是 Read()
或 Write()
完成。
例子
以下是我个人认为是完成所有这些工作的简洁方法的概述:
// Base class for async bidir RPCs handlers.
// This is so that the handling thread is not associated with a specific rpc method.
class RpcHandler {
// This will be used as the "tag" argument to the various grpc calls.
struct TagData {
enum class Type {
start_done,
read_done,
write_done,
// add more as needed...
};
RpcHandler* handler;
Type evt;
};
struct TagSet {
TagSet(RpcHandler* self)
: start_done{self, TagData::Type::start_done},
read_done{self, TagData::Type::read_done},
write_done{self, TagData::Type::write_done} {}
TagData start_done;
TagData read_done;
TagData write_done;
};
public:
RpcHandler() : tags(this) {}
virtual ~RpcHandler() = default;
// The actual tag objects we'll be passing
TagSet tags;
virtual void on_ready() = 0;
virtual void on_recv() = 0;
virtual void on_write_done() = 0;
static void handling_thread_main(grpc::CompletionQueue* cq) {
void* raw_tag = nullptr;
bool ok = false;
while (cq->Next(&raw_tag, &ok)) {
TagData* tag = reinterpret_cast<TagData*>(raw_tag);
if(!ok) {
// Handle error
}
else {
switch (tag->evt) {
case TagData::Type::start_done:
tag->handler->on_ready();
break;
case TagData::Type::read_done:
tag->handler->on_recv();
break;
case TagData::Type::write_done:
tag->handler->on_write_done();
break;
}
}
}
}
};
void do_something_with_response(Response const&);
class MyHandler final : public RpcHandler {
public:
using responder_ptr =
std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
MyHandler(responder_ptr responder) : responder_(std::move(responder)) {
// This lock is needed because StartCall() can
// cause the handler thread to access the object.
std::lock_guard lock(mutex_);
responder_->StartCall(&tags.start_done);
}
~MyHandler() {
// TODO: finish/abort the streaming rpc as appropriate.
}
void send(const Request& msg) {
std::lock_guard lock(mutex_);
if (!sending_) {
sending_ = true;
responder_->Write(msg, &tags.write_done);
} else {
// TODO: add some form of synchronous wait, or outright failure
// if the queue starts to get too big.
queued_msgs_.push(msg);
}
}
private:
// When the rpc is ready, queue the first read
void on_ready() override {
std::lock_guard l(mutex_); // To synchronize with the constructor
responder_->Read(&incoming_, &tags.read_done);
};
// When a message arrives, use it, and start reading the next one
void on_recv() override {
// incoming_ never leaves the handling thread, so no need to lock
// ------ If handling is cheap and stays in the handling thread.
do_something_with_response(incoming_);
responder_->Read(&incoming_, &tags.read_done);
// ------ If responses is expensive or involves another thread.
// Response msg = std::move(incoming_);
// responder_->Read(&incoming_, &tags.read_done);
// do_something_with_response(msg);
};
// When has been sent, send the next one is there is any
void on_write_done() override {
std::lock_guard lock(mutex_);
if (!queued_msgs_.empty()) {
responder_->Write(queued_msgs_.front(), &tags.write_done);
queued_msgs_.pop();
} else {
sending_ = false;
}
};
responder_ptr responder_;
// Only ever touched by the handler thread post-construction.
Response incoming_;
bool sending_ = false;
std::queue<Request> queued_msgs_;
std::mutex mutex_; // grpc might be thread-safe, MyHandler isn't...
};
int main() {
// Start the thread as soon as you have a completion queue.
auto cq = std::make_unique<grpc::CompletionQueue>();
std::thread t(RpcHandler::handling_thread_main, cq.get());
// Multiple concurent RPCs sharing the same handling thread:
MyHandler handler1(serviceA->MethodA(&context, cq.get()));
MyHandler handler2(serviceA->MethodA(&context, cq.get()));
MyHandlerB handler3(serviceA->MethodB(&context, cq.get()));
MyHandlerC handler4(serviceB->MethodC(&context, cq.get()));
}
如果您的眼睛很敏锐,您会注意到上面的代码在处理程序中存储了一堆(每个事件类型一个)冗余 this
指针。这通常没什么大不了的,但是可以通过多重继承和向下转换来避免它们,但这开始有点超出了这个问题的范围。
我有使用 ClientAsyncReaderWriter 与服务器通信的双向流式异步 grpc 客户端。 RPC 代码如下所示:
rpc Process (stream Request) returns (stream Response)
为简单起见,请求和响应是字节数组 (byte[]
)。我向服务器发送几块数据,当服务器积累足够的数据时,服务器处理这些数据并发回响应并继续为下一个响应积累数据。几次响应后,服务器发送最终响应并关闭连接。
对于异步客户端,我使用 CompletionQueue
。代码如下:
...
CompletionQueue cq;
std::unique_ptr<Stub> stub;
grpc::ClientContext context;
std::unique_ptr<grpc::ClientAsyncReaderWriter<Request,Response>> responder = stub->AsyncProcess(&context, &cq, handler);
// thread for completition queue
std::thread t(
[]{
void *handler = nullptr;
bool ok = false;
while (cq_.Next(&handler, &ok)) {
if (can_read) {
// how do you know that it is read data available
// Do read
} else {
// do write
...
Request request = prepare_request();
responder_->Write(request, handler);
}
}
}
);
...
// wait
异步阅读的正确方法是什么?如果没有可用数据,我可以尝试阅读吗?是阻塞调用吗?
排序 Read() 调用
Can I try to read if it no data available?
是的,而且这种情况经常发生。 Read()
在数据可用之前什么都不做,然后才将其传递的标签放入完成队列。 (详情见下文)
Is it blocking call?
没有。 Read()
和 Write()
return 立即。但是,您在任何给定时刻只能让其中一架飞行。如果您尝试在前一个完成之前发送第二个,它(第二个)将失败。
What is the proper way to async reading?
每完成一个 Read()
,就开始一个新的。为此,您需要能够判断 Read()
何时完成。这就是标签的用武之地!
当您调用 Read(&msg, tag)
或 Write(request, tag)
时,您是在告诉 grpc 在该操作完成后将 tag
放入与该响应者关联的完成队列中。 grpc 不关心标签是什么,它只是传递它。
所以您要采用的一般策略是:
- 一旦您准备好开始接收消息:
- 调用
responder->Read()
一次,使用一些您将识别为“已读完成”的标签。
- 调用
- 每当
cq_.Next()
还给你那个标签,并且ok == true
:- 消费消息
- 使用相同标签排队新的
responder->Read()
。
显然,您还需要对 Write()
.
但是由于您仍然希望能够从给定标签中查找处理程序实例,因此您需要一种方法来打包对处理程序的引用以及有关在单个标签中完成哪个操作的信息.
完成队列
从给定的标签中查找处理程序实例?为什么?
完成队列真正的 存在理由 不幸的是,这些例子并不明显。它们允许多个异步 rpc 共享同一个线程。除非您的应用程序只进行一次 rpc 调用,否则处理线程不应与特定的响应者相关联。相反,该线程应该是一个通用工作线程,它根据标签的内容将事件分派到正确的处理程序。
官方示例倾向于使用指向处理程序对象的指针作为标记来做到这一点。当预期有特定的事件序列时,这会起作用,因为您可以轻松预测处理程序的反应。您通常不能使用异步双向流来做到这一点,因为任何给定的完成事件都可能是 Read()
或 Write()
完成。
例子
以下是我个人认为是完成所有这些工作的简洁方法的概述:
// Base class for async bidir RPCs handlers.
// This is so that the handling thread is not associated with a specific rpc method.
class RpcHandler {
// This will be used as the "tag" argument to the various grpc calls.
struct TagData {
enum class Type {
start_done,
read_done,
write_done,
// add more as needed...
};
RpcHandler* handler;
Type evt;
};
struct TagSet {
TagSet(RpcHandler* self)
: start_done{self, TagData::Type::start_done},
read_done{self, TagData::Type::read_done},
write_done{self, TagData::Type::write_done} {}
TagData start_done;
TagData read_done;
TagData write_done;
};
public:
RpcHandler() : tags(this) {}
virtual ~RpcHandler() = default;
// The actual tag objects we'll be passing
TagSet tags;
virtual void on_ready() = 0;
virtual void on_recv() = 0;
virtual void on_write_done() = 0;
static void handling_thread_main(grpc::CompletionQueue* cq) {
void* raw_tag = nullptr;
bool ok = false;
while (cq->Next(&raw_tag, &ok)) {
TagData* tag = reinterpret_cast<TagData*>(raw_tag);
if(!ok) {
// Handle error
}
else {
switch (tag->evt) {
case TagData::Type::start_done:
tag->handler->on_ready();
break;
case TagData::Type::read_done:
tag->handler->on_recv();
break;
case TagData::Type::write_done:
tag->handler->on_write_done();
break;
}
}
}
}
};
void do_something_with_response(Response const&);
class MyHandler final : public RpcHandler {
public:
using responder_ptr =
std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
MyHandler(responder_ptr responder) : responder_(std::move(responder)) {
// This lock is needed because StartCall() can
// cause the handler thread to access the object.
std::lock_guard lock(mutex_);
responder_->StartCall(&tags.start_done);
}
~MyHandler() {
// TODO: finish/abort the streaming rpc as appropriate.
}
void send(const Request& msg) {
std::lock_guard lock(mutex_);
if (!sending_) {
sending_ = true;
responder_->Write(msg, &tags.write_done);
} else {
// TODO: add some form of synchronous wait, or outright failure
// if the queue starts to get too big.
queued_msgs_.push(msg);
}
}
private:
// When the rpc is ready, queue the first read
void on_ready() override {
std::lock_guard l(mutex_); // To synchronize with the constructor
responder_->Read(&incoming_, &tags.read_done);
};
// When a message arrives, use it, and start reading the next one
void on_recv() override {
// incoming_ never leaves the handling thread, so no need to lock
// ------ If handling is cheap and stays in the handling thread.
do_something_with_response(incoming_);
responder_->Read(&incoming_, &tags.read_done);
// ------ If responses is expensive or involves another thread.
// Response msg = std::move(incoming_);
// responder_->Read(&incoming_, &tags.read_done);
// do_something_with_response(msg);
};
// When has been sent, send the next one is there is any
void on_write_done() override {
std::lock_guard lock(mutex_);
if (!queued_msgs_.empty()) {
responder_->Write(queued_msgs_.front(), &tags.write_done);
queued_msgs_.pop();
} else {
sending_ = false;
}
};
responder_ptr responder_;
// Only ever touched by the handler thread post-construction.
Response incoming_;
bool sending_ = false;
std::queue<Request> queued_msgs_;
std::mutex mutex_; // grpc might be thread-safe, MyHandler isn't...
};
int main() {
// Start the thread as soon as you have a completion queue.
auto cq = std::make_unique<grpc::CompletionQueue>();
std::thread t(RpcHandler::handling_thread_main, cq.get());
// Multiple concurent RPCs sharing the same handling thread:
MyHandler handler1(serviceA->MethodA(&context, cq.get()));
MyHandler handler2(serviceA->MethodA(&context, cq.get()));
MyHandlerB handler3(serviceA->MethodB(&context, cq.get()));
MyHandlerC handler4(serviceB->MethodC(&context, cq.get()));
}
如果您的眼睛很敏锐,您会注意到上面的代码在处理程序中存储了一堆(每个事件类型一个)冗余 this
指针。这通常没什么大不了的,但是可以通过多重继承和向下转换来避免它们,但这开始有点超出了这个问题的范围。