尝试从单独的线程多次调用 Write() 会导致崩溃 [gRPC] [C++]
Attempting to call Write() multiple times from separate thread(s) causes crash [gRPC] [C++]
我正在尝试用 C++ 编写异步流式 gRPC 服务器(遵循 this 示例),其中对 Write 的多次调用在单独的线程上执行。不幸的是,这会导致我的系统出现 SIGSEGV。服务器能够在崩溃前执行一次写入。下面的代码提供了一个我正在尝试做的简单示例。重载的调用运算符从单独的线程接收消息并执行 Write() 调用,将 MyMessage 写入流。
void MyServer::HandleRpcs() {
new CallData(&m_service, m_queue.get());
void* tag;
bool ok;
while (true) {
GPR_ASSERT(m_queue->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
void MyServer::CallData::Proceed() {
if (m_state == CREATE) {
m_state = PROCESS;
m_service->RequestRpc(&m_context, &m_request, &m_responder, m_queue, m_queue, this);
}
else if (m_state == PROCESS) {
new CallData(m_service, m_queue);
// Request the RPC here, which begins the message calls to the overloaded () operator
}
else {
GPR_ASSERT(m_state == FINISH);
delete this;
}
}
void MyServer::CallData::operator()(Message message) {
std::lock_guard<std::recursive_mutex> lock{m_serverMutex};
MyStream stream;
stream.set_message(message.payload);
m_responder.Write(stream, this);
PushTaskToQueue();
}
void MyServer::CallData::PushTaskToQueue() {
// m_alarm is a member of CallData
m_alarm.Set(m_queue, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}
原来我对gRPC和完成队列有误解。我在完成队列 returns 标记之前调用 Write(),这导致了崩溃。为了解决这个问题,我在 MyServer
中创建了一个名为 m_tag
的 static void*
成员变量,并将其传递给 Next
函数的 tag
参数,如下所示:
GPR_ASSERT(m_queue->Next(&m_tag, &ok));
然后,我检查了标记是否与重载调用运算符中处理程序的 this
指针匹配:
if (m_tag != this) return;
然后我看到我的消息流通过了。
我正在尝试用 C++ 编写异步流式 gRPC 服务器(遵循 this 示例),其中对 Write 的多次调用在单独的线程上执行。不幸的是,这会导致我的系统出现 SIGSEGV。服务器能够在崩溃前执行一次写入。下面的代码提供了一个我正在尝试做的简单示例。重载的调用运算符从单独的线程接收消息并执行 Write() 调用,将 MyMessage 写入流。
void MyServer::HandleRpcs() {
new CallData(&m_service, m_queue.get());
void* tag;
bool ok;
while (true) {
GPR_ASSERT(m_queue->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
void MyServer::CallData::Proceed() {
if (m_state == CREATE) {
m_state = PROCESS;
m_service->RequestRpc(&m_context, &m_request, &m_responder, m_queue, m_queue, this);
}
else if (m_state == PROCESS) {
new CallData(m_service, m_queue);
// Request the RPC here, which begins the message calls to the overloaded () operator
}
else {
GPR_ASSERT(m_state == FINISH);
delete this;
}
}
void MyServer::CallData::operator()(Message message) {
std::lock_guard<std::recursive_mutex> lock{m_serverMutex};
MyStream stream;
stream.set_message(message.payload);
m_responder.Write(stream, this);
PushTaskToQueue();
}
void MyServer::CallData::PushTaskToQueue() {
// m_alarm is a member of CallData
m_alarm.Set(m_queue, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}
原来我对gRPC和完成队列有误解。我在完成队列 returns 标记之前调用 Write(),这导致了崩溃。为了解决这个问题,我在 MyServer
中创建了一个名为 m_tag
的 static void*
成员变量,并将其传递给 Next
函数的 tag
参数,如下所示:
GPR_ASSERT(m_queue->Next(&m_tag, &ok));
然后,我检查了标记是否与重载调用运算符中处理程序的 this
指针匹配:
if (m_tag != this) return;
然后我看到我的消息流通过了。