gRPC Java 客户端能否通过长期存在的 gRPC 流并行发送多个请求以及如何管理 N 个流

Can gRPC Java client send multiple requests in parallel over a long lived gRPC stream and how to manage N streams

我正在使用 'Streaming RPC' API,其中流式传输 MyRequests 和 MyResponse

service MyStreamedService {
  rpc myOperation(**stream** MyRequest) returns (**stream** MyResponse)
}

这是包装 gRPC 流的 class 的稍微简化的版本;

public class MyStreamWrapper implements StreamObserver<MyResponse> {
  public MyStreamWrapper(ManagedChannel myChannel) {
    myStub = MyStreamedServiceGrpc.newStub(myChannel);
    // create a stream and maintain a long lived reference to the stream via StreamObserver's
    myStream = myStub.myOperation(this);
  }
  
  @Override
  public void onNext(MyResponse r) {
    // handle the response (not shown)
  }
  
  @Override
  public void onError(Throwable t) {
    // very unfortunate that there is no error code in this API !
    // throttle (not shown but if I don't throttle, eats CPU)
    // Create a new stream
    myStream = myStub.myOperation(this);
  }
  
  @Override
  public void onCompleted() {
    // server has called StreamObserver<MyRequest>.onCompleted
    // Create a new stream using the async API
    myStream = myStub.myOperation(this);
  }
  
  // Context: many threada that want to send a request asynchronously
  public void send(MyRequest r) {
    synchronized(myStream) {
      myStream.onNext(r);
    }
  }
}

问题

  1. 为什么访问myStream需要在send方法中同步?我想了解为什么我必须同步想要在同一流上并行发送无序请求的线程。如果每个请求都打包在具有自己的流 ID 的 HTTP2 数据帧中,那么这是否只是 gRPC 客户端的 Java 实现所独有的?
  2. 线程 returns 从方法发送时保证发生了什么?
  1. 鉴于客户端线程一次同步到一个onNext 调用,客户端是否会使服务器过载或通过在上述send 方法中阻塞客户端线程来施加背压?我看到 Streams 正在关闭,并且出现诸如“内部:RST_STREAM 已关闭流。HTTP/2 错误代码:PROTOCOL_ERROR”之类的错误。
  2. 考虑到创建流的成本很低,维护和重用 myStream 是否不寻常?
  3. 考虑到流只能定向到一台服务器,我认为我需要向上面的简单 class 添加更多代码以便在创建频道时创建 N myStream 然后轮流-robin N myStream 中每一个的发送方法。不幸的是,没有 API 来确定 myStream 当前是否 'busy' 有另一个 RPC 请求。或者,我可以动态创建新流并添加一个信号量 (size:N) 来限制尝试使用它们的线程数。

我认为我理解的事情...

事实证明,所有答案都在 HTTP/2 规范 https://datatracker.ietf.org/doc/html/rfc7540# 中,可读性很强。

  1. Eric Anderson 回答“请注意,该对象不是线程安全的,因此如果您同时从多个线程调用它的方法,您应该自己做 locking/synchronization”。并没有真正解释为什么,我猜一元 API 不需要同步,所以它们没有为 Streaming API
  2. 同步
  3. 应用程序请求至少在缓冲区中,准备好在 HTTP 数据帧中传输。线程不会阻塞等待任何已收到的确认。无论如何,数据帧都不会被确认。由于 HTTP2 流量控制以防止远端过载,应用程序可能无法发送数据,但即使在这种情况下,线程似乎也不太可能被阻塞。
  4. 如上所述,https://datatracker.ietf.org/doc/html/rfc7540#section-6.9 描述了流量控制。服务器授予客户端发送一定数量字节的权限,客户端必须在使用分配后停止。
  5. 不,并不罕见。 Here, Eric Anderson says "for fully-asynchronous processing we would expect you to start any I/O on the current thread and then call methods on StreamObserver from some other thread once the I/O completes.". Duplicate of
  6. 根据上面 Eric Anderson 的回复,我认为缓存流引用并重新使用它是正常的。但是,我不知道为什么 Java 通道 API 不包含方法 int getStreamLimit() 因为客户端需要知道根据 SETTINGS_MAX_CONCURRENT_STREAMS 值允许使用的许多流在创建通道期间交换的 HTTP2 SETTINGS 消息中。

总评: 我发现 gRPC 文档很难理解,直到我意识到大多数更深层次的问题都可以通过研究 HTTP2 来回答 例如
HTTP2 请求与原型 IDL 中的应用程序请求对象不对应 1:1。 Keypoint: HTTP2 Request 在流期间持续,可以携带许多应用程序请求对象。

  1. 首先,HTTP/2 消息可能很大,因此在一般情况下将 HTTP/2 帧放在线路上不是原子操作,因此多个线程可能同时写入同一个网络套接字。此外,还有与流量控制相关的缓冲:如果流的另一端还没有准备好接受更多消息,则它们需要在您这边进行缓冲,因此多个线程可能同时写入同一个缓冲区。
  2. "request is buffered in the gRPC client" 是正确答案:StreamObserver 的方法是异步的,它们 return 非常快并且实际的网络通信将在某个时候由另一个线程执行。
  3. 发送消息时,您应该使用 CallStreamObserver: isReady() and setOnReadyHandler(...) 中的方法尊重对方的准备情况(您始终可以将出站 StreamObserver 转换为 CallStreamObserver)。如果您忽略另一方的准备情况,gRPC 将在其内部缓冲区中缓冲消息,直到另一方准备就绪。但是,由于第 1 点中描述的缓冲,这在某些情况下可能会导致内存使用过多。
    顺便说一句:你可能想看看官方 copyWithFlowControl() helper methods and my own DispatchingOnReadyHandler class.
  4. 我猜您的意图是始终打开 RPC:如果是这样,您的代码似乎没问题。然而,问题是您是否应该使用单个 bi-di 调用与多个一元调用:如果 1 个请求消息的服务器处理与其他请求消息的处理不紧密相关(即:服务器不需要不断维护单个,在- 与所有请求消息相关的内存状态),那么一元调用至少有两个原因会更好:
    4.1.不需要第 1 点中描述的同步。
    4.2.您将更好地利用服务器负载平衡。
    在正常情况下,启动新的一元 RPC 的开销很小,因为它将在 existing HTTP/2 连接上打开一个新流。
    但是,如果某些请求消息的服务器端处理可能与其他一些先前的请求消息有关,那么您确实需要客户端流。但是,您应该尽可能尝试关闭并更新 RPC,以允许服务器平衡流量。
  5. 如果您的客户端 运行 作为某些服务器应用程序的一部分,那么 grpclb 策略是最常见的负载平衡选择:它将维护多个 HTTP/2连接到几个可用的后端,每个连接可能有多个 HTTP/2 流(HTTP/2 流与 gRPC 流对应 1-1)。此外,grpclb 将主动探测这些连接以验证它们是否健康,并自动重新发出 DNS 查询(或任何其他名称解析服务,如果您使用自定义 NameResolver)以查看是否在需要时添加了任何新后端。如果要使用它,请记住包含 grpc-grpclb 运行 时间依赖性。有关服务器应用程序到服务器应用程序案例中的负载平衡和名称解析的更多信息,请参阅答案底部的注释。
    如果 android 上的客户端 运行,则 grpclb 无法正常使用(大多数 android 设备缺乏 运行 后台负载平衡器的功能,即使可能它会很快耗尽设备的电池),但您的连接通常会通过位于后端服务器前面的一些负载平衡器。在这种情况下,每个新的 RPC 通常会转到占用最少的后端。
    但是,由于您似乎只维护 1 个长期调用,因此您的所有请求消息都将转到相同的后端,直到调用被“更新”:这就是为什么我建议在前一点中尽可能使用一元调用。这比实现您自己的负载平衡要简单得多,因此如果可能的话,它应该是首选。

关于 “我理解的事情” 部分的澄清:“子频道”通常基本上是 HTTP/2 连接:一个频道可能是多个 HTTP/2 连接(取决于客户端功能:参见第 5 点)到多个后端(当然取决于服务器配置)并且每个连接可以有多个独立的流(每 1 个 gRPC 调用 1 HTTP/2 流)。

关于服务器应用到服务器应用 gRPC 的负载平衡和名称解析的一些注意事项:

  • 客户端查找可用后端的最简单和最常用的方法是通过 DNS 解析。更复杂的机制包括 xDS 或 Consul 或自定义 NameResolvers.
  • 特别是在同一个k8s集群中同时部署后端和客户端应用时,最常见的方式是将后端部署为headless service,这样所有后端pods都可以通过客户端通过有关 <backed-service-name>.<k8s-namespace>.svc.cluster.local 的集群内 DNS 查询。例如 .forTarget("myGrpcBackendService.default.svc.cluster.local:6000").
  • 默认情况下,使用 grpclb 的客户端将尝试尽可能长时间地保持他们与后端的连接集,因此即使 RPC 是短暂的,底层连接也会坚持到同一组后端.为了强制定期重新平衡,服务器可以使用 NettyServerBuilder: maxConnectionAge(...), maxConnectionAgeGrace(...) and maxConnectionIdle(...).
  • 中的方法设置最大连接生命周期