gRPC:long-运行 流的最佳实践是什么?

gRPC: What are the best practices for long-running streaming?

我们已经实现了一个在云中运行的 Java gRPC 服务,具有单向(客户端到服务器)流式 RPC,如下所示:

rpc PushUpdates(stream Update) returns (Ack);

C++ 客户端(移动设备)在启动后立即调用此 rpc,每 30 秒左右连续发送一次更新,只要设备已启动并且 运行。

ChannelArguments chan_args;
// this will be secure channel eventually 
auto channel_p = CreateCustomChannel(remote_addr, InsecureChannelCredentials(), chan_args);
auto stub_p    = DialTcc::NewStub(channel_p);
// ...

Ack ack;
auto strm_ctxt_p = make_unique<ClientContext>();
auto strm_p      = stub_p->PushUpdates(strm_ctxt_p.get(), &ack);
// ...

While(true) {
    // wait until we are ready to send a new update
    Update updt;
    // populate updt;
    if(!strm_p->Write(updt)) {
        // stream is not kosher, create a new one and restart
        break;
    }
}

现在发生这种情况时会发生不同类型的网络中断:

我们已经看到,在此类事件中,通道和 Write() API 都无法可靠地检测到网络断开。有时客户端会继续调用 Write()(不会 return false),但服务器不会收到任何数据(wireshark 不会显示任何 activity客户端设备的传出端口)。

在此类情况下恢复的最佳做法是什么,以便服务器在此类事件发生后 X 秒 内开始接收更新?可以理解的是,每当发生此类事件时,都会丢失 X 秒 有价值的数据,但我们希望在 X 秒内可靠地恢复。

gRPC 版本:1.30.2,客户端:C++-14/Linux,服务器:Java/Linux

以下是我们破解它的方法。我想看看这是否可以做得更好,或者 gRPC 的任何人都可以指导我更好的解决方案。

我们服务的 protobuf 如下所示。它有一个用于 ping 服务的 RPC,经常用于测试连接。

// Message used in IsAlive RPC
message Empty {}

// Acknowledgement sent by the service for updates received
message UpdateAck {}

// Messages streamed to the service by the client
message Update {
...
...
}

service GrpcService {
  // for checking if we're able to connect
  rpc Ping(Empty) returns (Empty); 

  // streaming RPC for pushing updates by client
  rpc PushUpdate(stream Update) returns (UpdateAck);
}

这是 c++ 客户端的外观,它执行以下操作:

  • Connect():

    • 创建用于调用 RPC 的存根,如果存根是 nullptr
    • 定期调用Ping()直到成功。
    • 成功调用 PushUpdate(...) RPC 以创建新流。
    • 失败时将流重置为 nullptr
  • Stream():执行以下 while(true) 循环:

    • 获取要推送的更新。
    • 在要推送更新的流上调用 Write(...)
    • 如果 Write(...) 由于任何原因中断并且控制返回到 Connect()
    • 每 30 分钟(或某个固定时间间隔)一次,将所有内容(存根、频道、流)重置为 nullptr 以重新开始。这是必需的,因为有时即使客户端和服务之间没有连接,Write(...) 也不会失败。 Write(...) 调用成功但客户端上的传出端口在 wireshark 上没有显示任何 activity!

代码如下:

constexpr GRPC_TIMEOUT_S = 10;
constexpr RESTART_INTERVAL_M = 15;
constexpr GRPC_KEEPALIVE_TIME_MS = 10000;
string root_ca, tls_key, tls_cert; // for SSL
string remote_addr = "https://remote.com:5445";
...
...
void ResetStreaming() {
  if (stub_p != nullptr) {
    strm_p      = nullptr;
    strm_ctxt_p = nullptr;
    stub_p      = nullptr;
    channel_p   = nullptr;
  }
}

void CreateStub() {
  if (stub_p == nullptr) {
    ChannelArguments chan_args;
    chan_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, GRPC_KEEPALIVE_TIME_MS);
    channel_p = CreateCustomChannel(
        remote_addr,
        SslCredentials(SslCredentialsOptions{root_ca, tls_key, tls_cert}),
        chan_args);
    stub_p = GrpcService::NewStub(m_channel_p);
  }
}

void Stream() {
  const auto restart_time = steady_clock::now() + minutes(RESTART_INTERVAL_M);
  while (!stop) {
    // restart every RESTART_INTERVAL_M (15m) even if ALL IS WELL!!
    if (steady_clock::now() > restart_time) {
      break;
    }
    Update updt = GetUpdate(); // get the update to be sent
    if (!stop) {
      if (channel_p->GetState(true) == GRPC_CHANNEL_SHUTDOWN ||
                 !strm_p->Write(updt)) {
        // could not write!!
        return;  // we will Connect() again
      }
    }
  }
  // stopped due to stop = true or interval to create new stream has expired
  ResetStreaming();  // channel, stub, stream are recreated once in every 15m
}

bool PingRemote() {
  ClientContext ctxt;
  ctxt.set_deadline(system_clock::now() + seconds(GRPC_TIMEOUT_S));
  Empty req, resp;
  CreateStub();
  if (stub_p->Ping(&ctxt, req, &resp).ok()) {
    static UpdateAck ack;
    strm_ctxt_p = make_unique<ClientContext>();  // need new context
    strm_p      = stub_p->PushUpdate(strm_ctxt_p.get(), &ack);
    return true;
  }
  if (strm_p) {
    strm_p      = nullptr;
    strm_ctxt_p = nullptr;
  }
  return false;
}

void Connect() {
  while (!stop) {
    if (PingRemote() || stop) {
      break;
    }
    sleep_for(seconds(5)); // wait before retrying
  }
}

// set to true from another thread when we want to stop
atomic<bool> stop = false;

void StreamUntilStopped() {
  if (stop) {
    return;
  }
  strm_thread_p = make_unique<thread>([&] {
    while (!stop) {
      Connect();
      Stream();
    }
  });
}

// called by the thread that sets stop = true
void Finish() {
  strm_thread_p->join();
}

据此我们看到,每当 any[=68] 中断时,流式传输会在 15 分钟(或 RESTART_INTERVAL_M)内恢复 =] 原因。此代码以快速路径运行,所以我很想知道是否可以做得更好。