与 StreamObserver 异步交互是否安全,即使用 Java 8+ CompletableFutures?

Is it safe to interact with StreamObserver asynchronously, i.e. with Java 8+ CompletableFutures?

我正在查看 Simple RPC example from grpc.io's basic tutorial:

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  responseObserver.onNext(checkFeature(request));
  responseObserver.onCompleted();
}

...

private Feature checkFeature(Point location) {
  for (Feature feature : features) {
    if (feature.getLocation().getLatitude() == location.getLatitude()
        && feature.getLocation().getLongitude() == location.getLongitude()) {
      return feature;
    }
  }

  // No feature was found, return an unnamed feature.
  return Feature.newBuilder().setName("").setLocation(location).build();
}

从其他线程与 StreamObserver 交互有什么注意事项吗? 例如,说 checkFeature() 异步命中另一个服务,返回 CompletableFuture:

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      thenAccept(feature -> responseObserver.onNext(feature));
  responseObserver.onCompleted();
}

当然,上面的方法是行不通的,因为第一个线程会在返回功能之前执行 onCompleted()。所以让我们解决这个问题:

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      thenAccept(feature -> {
        responseObserver.onNext(feature);
        responseObserver.onCompleted();
      });
}

我认为这应该可行,但我是 Java 的新手,所以我想知道会有什么后果。例如,

如果有人也能指导我了解他们的推理方式,那就太好了。我尝试查找 StreamObserver 的实际实现,但我不确定要查找什么。

Will Context.current() be consistent?

Context.current() 正在使用 ThreadLocal。如果您在不同的线程上访问它,它将不一致。您可以在线程之间传播上下文。您可能会发现 有用。

Will anything cause the StreamObserver to destruct or close prematurely besides onNext() for a unary calls and onError()?

是的,StreamObserver 的正常流程以 onError 或 onCompleted 结束。

StreamObserver javadoc 所述,"Since individual StreamObservers are not thread-safe, if multiple threads will be writing to a StreamObserver concurrently, the application must synchronize calls"。如果您并发调用 StreamObserver,则需要同步调用。换句话说,如果你确定即使你使用多线程也不会并发调用,那应该没问题。

如果在多个线程上访问同一个 StreamObserver,我会尝试同步它,除非性能很关键,因为它很容易出错。至少,它值得好评。

thenAccept()调用onNext()onCompleted()就可以了,因为观察者不是多线程并发调用的

单独调用 onCompleted() 的 "broken" 示例也被破坏,因为它可以在没有任何形式的同步的情况下从多个线程调用观察者。 StreamObservers 不能同时从多个线程调用。

虽然使用 thenAccept() 不太正确,因为它不能处理 future 失败的情况。所以你还需要接收 Throwable ,这可以用 whenComplete():

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      whenComplete((feature, t) -> {
        if (t != null) {
          responseObserver.onError(t);
        } else {
          responseObserver.onNext(feature);
          responseObserver.onCompleted();
        }
      });
}

处理该 lambda 时,上下文很容易成为 "wrong"。通常我们会寻找 "architectural" 解决方案来确保传播上下文,例如在创建它们时将所有应用程序线程池包装在 Context.currentContextExecutor() 中,因此各个调用站点不需要关心传播。我对CompletableFuture不够熟悉,无法提供策略。