gRPC:long-运行 流的最佳实践是什么?
gRPC: What are the best practices for long-running streaming?
我们已经实现了一个在云中运行的 Java gRPC 服务,具有单向(客户端到服务器)流式 RPC,如下所示:
rpc PushUpdates(stream Update) returns (Ack);
C++ 客户端(移动设备)在启动后立即调用此 rpc,每 30 秒左右连续发送一次更新,只要设备已启动并且 运行。
ChannelArguments chan_args;
// this will be secure channel eventually
auto channel_p = CreateCustomChannel(remote_addr, InsecureChannelCredentials(), chan_args);
auto stub_p = DialTcc::NewStub(channel_p);
// ...
Ack ack;
auto strm_ctxt_p = make_unique<ClientContext>();
auto strm_p = stub_p->PushUpdates(strm_ctxt_p.get(), &ack);
// ...
While(true) {
// wait until we are ready to send a new update
Update updt;
// populate updt;
if(!strm_p->Write(updt)) {
// stream is not kosher, create a new one and restart
break;
}
}
现在发生这种情况时会发生不同类型的网络中断:
- 云中的 gRPC 服务 运行 可能会停机(进行维护)或者可能只是变得无法访问。
- 由于是移动设备,设备自身的IP地址不断变化。
我们已经看到,在此类事件中,通道和 Write()
API 都无法可靠地检测到网络断开。有时客户端会继续调用 Write()
(不会 return false
),但服务器不会收到任何数据(wireshark 不会显示任何 activity客户端设备的传出端口)。
在此类情况下恢复的最佳做法是什么,以便服务器在此类事件发生后 X 秒 内开始接收更新?可以理解的是,每当发生此类事件时,都会丢失 X 秒 有价值的数据,但我们希望在 X 秒内可靠地恢复。
gRPC 版本:1.30.2,客户端:C++-14/Linux,服务器:Java/Linux
以下是我们破解它的方法。我想看看这是否可以做得更好,或者 gRPC 的任何人都可以指导我更好的解决方案。
我们服务的 protobuf 如下所示。它有一个用于 ping 服务的 RPC,经常用于测试连接。
// Message used in IsAlive RPC
message Empty {}
// Acknowledgement sent by the service for updates received
message UpdateAck {}
// Messages streamed to the service by the client
message Update {
...
...
}
service GrpcService {
// for checking if we're able to connect
rpc Ping(Empty) returns (Empty);
// streaming RPC for pushing updates by client
rpc PushUpdate(stream Update) returns (UpdateAck);
}
这是 c++ 客户端的外观,它执行以下操作:
Connect()
:
- 创建用于调用 RPC 的存根,如果存根是
nullptr
。
- 定期调用
Ping()
直到成功。
- 成功调用
PushUpdate(...)
RPC 以创建新流。
- 失败时将流重置为
nullptr
。
Stream()
:执行以下 while(true)
循环:
- 获取要推送的更新。
- 在要推送更新的流上调用
Write(...)
。
- 如果
Write(...)
由于任何原因中断并且控制返回到 Connect()
。
- 每 30 分钟(或某个固定时间间隔)一次,将所有内容(存根、频道、流)重置为
nullptr
以重新开始。这是必需的,因为有时即使客户端和服务之间没有连接,Write(...)
也不会失败。 Write(...)
调用成功但客户端上的传出端口在 wireshark 上没有显示任何 activity!
代码如下:
constexpr GRPC_TIMEOUT_S = 10;
constexpr RESTART_INTERVAL_M = 15;
constexpr GRPC_KEEPALIVE_TIME_MS = 10000;
string root_ca, tls_key, tls_cert; // for SSL
string remote_addr = "https://remote.com:5445";
...
...
void ResetStreaming() {
if (stub_p != nullptr) {
strm_p = nullptr;
strm_ctxt_p = nullptr;
stub_p = nullptr;
channel_p = nullptr;
}
}
void CreateStub() {
if (stub_p == nullptr) {
ChannelArguments chan_args;
chan_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, GRPC_KEEPALIVE_TIME_MS);
channel_p = CreateCustomChannel(
remote_addr,
SslCredentials(SslCredentialsOptions{root_ca, tls_key, tls_cert}),
chan_args);
stub_p = GrpcService::NewStub(m_channel_p);
}
}
void Stream() {
const auto restart_time = steady_clock::now() + minutes(RESTART_INTERVAL_M);
while (!stop) {
// restart every RESTART_INTERVAL_M (15m) even if ALL IS WELL!!
if (steady_clock::now() > restart_time) {
break;
}
Update updt = GetUpdate(); // get the update to be sent
if (!stop) {
if (channel_p->GetState(true) == GRPC_CHANNEL_SHUTDOWN ||
!strm_p->Write(updt)) {
// could not write!!
return; // we will Connect() again
}
}
}
// stopped due to stop = true or interval to create new stream has expired
ResetStreaming(); // channel, stub, stream are recreated once in every 15m
}
bool PingRemote() {
ClientContext ctxt;
ctxt.set_deadline(system_clock::now() + seconds(GRPC_TIMEOUT_S));
Empty req, resp;
CreateStub();
if (stub_p->Ping(&ctxt, req, &resp).ok()) {
static UpdateAck ack;
strm_ctxt_p = make_unique<ClientContext>(); // need new context
strm_p = stub_p->PushUpdate(strm_ctxt_p.get(), &ack);
return true;
}
if (strm_p) {
strm_p = nullptr;
strm_ctxt_p = nullptr;
}
return false;
}
void Connect() {
while (!stop) {
if (PingRemote() || stop) {
break;
}
sleep_for(seconds(5)); // wait before retrying
}
}
// set to true from another thread when we want to stop
atomic<bool> stop = false;
void StreamUntilStopped() {
if (stop) {
return;
}
strm_thread_p = make_unique<thread>([&] {
while (!stop) {
Connect();
Stream();
}
});
}
// called by the thread that sets stop = true
void Finish() {
strm_thread_p->join();
}
据此我们看到,每当 any[=68] 中断时,流式传输会在 15 分钟(或 RESTART_INTERVAL_M
)内恢复 =] 原因。此代码以快速路径运行,所以我很想知道是否可以做得更好。
我们已经实现了一个在云中运行的 Java gRPC 服务,具有单向(客户端到服务器)流式 RPC,如下所示:
rpc PushUpdates(stream Update) returns (Ack);
C++ 客户端(移动设备)在启动后立即调用此 rpc,每 30 秒左右连续发送一次更新,只要设备已启动并且 运行。
ChannelArguments chan_args;
// this will be secure channel eventually
auto channel_p = CreateCustomChannel(remote_addr, InsecureChannelCredentials(), chan_args);
auto stub_p = DialTcc::NewStub(channel_p);
// ...
Ack ack;
auto strm_ctxt_p = make_unique<ClientContext>();
auto strm_p = stub_p->PushUpdates(strm_ctxt_p.get(), &ack);
// ...
While(true) {
// wait until we are ready to send a new update
Update updt;
// populate updt;
if(!strm_p->Write(updt)) {
// stream is not kosher, create a new one and restart
break;
}
}
现在发生这种情况时会发生不同类型的网络中断:
- 云中的 gRPC 服务 运行 可能会停机(进行维护)或者可能只是变得无法访问。
- 由于是移动设备,设备自身的IP地址不断变化。
我们已经看到,在此类事件中,通道和 Write()
API 都无法可靠地检测到网络断开。有时客户端会继续调用 Write()
(不会 return false
),但服务器不会收到任何数据(wireshark 不会显示任何 activity客户端设备的传出端口)。
在此类情况下恢复的最佳做法是什么,以便服务器在此类事件发生后 X 秒 内开始接收更新?可以理解的是,每当发生此类事件时,都会丢失 X 秒 有价值的数据,但我们希望在 X 秒内可靠地恢复。
gRPC 版本:1.30.2,客户端:C++-14/Linux,服务器:Java/Linux
以下是我们破解它的方法。我想看看这是否可以做得更好,或者 gRPC 的任何人都可以指导我更好的解决方案。
我们服务的 protobuf 如下所示。它有一个用于 ping 服务的 RPC,经常用于测试连接。
// Message used in IsAlive RPC
message Empty {}
// Acknowledgement sent by the service for updates received
message UpdateAck {}
// Messages streamed to the service by the client
message Update {
...
...
}
service GrpcService {
// for checking if we're able to connect
rpc Ping(Empty) returns (Empty);
// streaming RPC for pushing updates by client
rpc PushUpdate(stream Update) returns (UpdateAck);
}
这是 c++ 客户端的外观,它执行以下操作:
Connect()
:- 创建用于调用 RPC 的存根,如果存根是
nullptr
。 - 定期调用
Ping()
直到成功。 - 成功调用
PushUpdate(...)
RPC 以创建新流。 - 失败时将流重置为
nullptr
。
- 创建用于调用 RPC 的存根,如果存根是
Stream()
:执行以下while(true)
循环:- 获取要推送的更新。
- 在要推送更新的流上调用
Write(...)
。 - 如果
Write(...)
由于任何原因中断并且控制返回到Connect()
。 - 每 30 分钟(或某个固定时间间隔)一次,将所有内容(存根、频道、流)重置为
nullptr
以重新开始。这是必需的,因为有时即使客户端和服务之间没有连接,Write(...)
也不会失败。Write(...)
调用成功但客户端上的传出端口在 wireshark 上没有显示任何 activity!
代码如下:
constexpr GRPC_TIMEOUT_S = 10;
constexpr RESTART_INTERVAL_M = 15;
constexpr GRPC_KEEPALIVE_TIME_MS = 10000;
string root_ca, tls_key, tls_cert; // for SSL
string remote_addr = "https://remote.com:5445";
...
...
void ResetStreaming() {
if (stub_p != nullptr) {
strm_p = nullptr;
strm_ctxt_p = nullptr;
stub_p = nullptr;
channel_p = nullptr;
}
}
void CreateStub() {
if (stub_p == nullptr) {
ChannelArguments chan_args;
chan_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, GRPC_KEEPALIVE_TIME_MS);
channel_p = CreateCustomChannel(
remote_addr,
SslCredentials(SslCredentialsOptions{root_ca, tls_key, tls_cert}),
chan_args);
stub_p = GrpcService::NewStub(m_channel_p);
}
}
void Stream() {
const auto restart_time = steady_clock::now() + minutes(RESTART_INTERVAL_M);
while (!stop) {
// restart every RESTART_INTERVAL_M (15m) even if ALL IS WELL!!
if (steady_clock::now() > restart_time) {
break;
}
Update updt = GetUpdate(); // get the update to be sent
if (!stop) {
if (channel_p->GetState(true) == GRPC_CHANNEL_SHUTDOWN ||
!strm_p->Write(updt)) {
// could not write!!
return; // we will Connect() again
}
}
}
// stopped due to stop = true or interval to create new stream has expired
ResetStreaming(); // channel, stub, stream are recreated once in every 15m
}
bool PingRemote() {
ClientContext ctxt;
ctxt.set_deadline(system_clock::now() + seconds(GRPC_TIMEOUT_S));
Empty req, resp;
CreateStub();
if (stub_p->Ping(&ctxt, req, &resp).ok()) {
static UpdateAck ack;
strm_ctxt_p = make_unique<ClientContext>(); // need new context
strm_p = stub_p->PushUpdate(strm_ctxt_p.get(), &ack);
return true;
}
if (strm_p) {
strm_p = nullptr;
strm_ctxt_p = nullptr;
}
return false;
}
void Connect() {
while (!stop) {
if (PingRemote() || stop) {
break;
}
sleep_for(seconds(5)); // wait before retrying
}
}
// set to true from another thread when we want to stop
atomic<bool> stop = false;
void StreamUntilStopped() {
if (stop) {
return;
}
strm_thread_p = make_unique<thread>([&] {
while (!stop) {
Connect();
Stream();
}
});
}
// called by the thread that sets stop = true
void Finish() {
strm_thread_p->join();
}
据此我们看到,每当 any[=68] 中断时,流式传输会在 15 分钟(或 RESTART_INTERVAL_M
)内恢复 =] 原因。此代码以快速路径运行,所以我很想知道是否可以做得更好。