当客户端取消服务调用时,不会触发 gRPC 服务的 Context CancellationListener

gRPC services's Context CancellationListener is not fired when client cancels a service call

我有一个流媒体服务,可以无限期地从服务器流式传输到客户端,直到客户端取消。

在服务器端,我有一个线程使用来自数据库的数据填充 ehcache。

Ehcache 提供对缓存事件的回调,即添加项目时、删除项目时等。我只关心在将元素放入缓存时通知客户端,因此当客户端连接到我的gRPC 服务,我在缓存中注册了一个 notifyElementPut() 回调,它引用了已连接的客户端 StreamObserver:

public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {


  private StreamObserver<FooUpdateResponse> responseObserver;

  public GrpcAwareCacheEventListener(
      StreamObserver<FooUpdateResponse> responseObserver) {
    this.responseObserver = responseObserver;
  }


  @Override
  public void notifyElementPut(Ehcache cache, Element element) throws CacheException {

    Foo foo = (Foo) element.getObjectValue();
    if (foo != null) {
      responseObserver.onNext(
          FooResponse.newBuilder().setFoo(foo).build());
    }
  }
}

我的流媒体foo服务如下:

    public void streamFooUpdates(Empty request,
              StreamObserver<FooResponse> responseObserver) {

            final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
            fooCache.getCacheEventNotificationService().registerListener(eventListener);
            Context.current().withCancellation().addListener(new CancellationListener() {

              public void cancelled(Context context) {
    log.info("inside context cancelled callback");      
  fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
              }

            }, ForkJoinPool.commonPool());



          }

一切正常,只要连接上,客户端就会收到所有 foo 更新的通知。

但是,在客户端断开连接或明确取消调用后,我预计服务器的上下文取消侦听器会触发,从而取消注册缓存的回调。

不是这样的,不管客户端是关闭通道,还是显式取消调用。 (我希望服务器端取消的上下文会为这两个事件触发)。我想知道我在客户端的取消语义是否不正确,这是我的客户端代码,取自测试用例:

Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
        .usePlaintext().build();

    FooServiceGrpc.FooService stub = FooServiceGrpc
        .newStub(channel);


    ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
      public void onNext(FooResponse response) {
        log.info("received foo: {}", response.getFoo());
      }

      public void onError(Throwable throwable) {

      }

      public void onCompleted() {

      }

      public boolean isReady() {
        return false;
      }

      public void setOnReadyHandler(Runnable runnable) {

      }

      public void disableAutoInboundFlowControl() {

      }

      public void request(int i) {

      }

      public void setMessageCompression(boolean b) {

      }

      public void cancel(@Nullable String s, @Nullable Throwable throwable) {

      }
    };

    stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
    Thread.sleep(10000); // sleep 10 seconds while messages are received.
    cancellableObserver.cancel("cancelling from test", null); //explicit cancel
    ((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.

    Thread.sleep(7000); //channel should be shutdown by now.

  }

我想知道为什么服务器没有触发 "Context cancelled" 回调。

谢谢!

您没有正确取消客户端调用。 stub.streamFooUpdates() 的第二个参数上的 StreamObserver 是您的回调。你不应该在 StreamObserver.

上调用任何东西

有两种方法可以从客户端取消调用。

选项 1:传递一个 ClientResponseObserver 作为第二个参数,实现 beforeStart(),它给你一个 ClientCallStreamObserver,你可以调用 cancel().

方案二:运行stub.streamFooUpdates()里面一个CancellableContext,取消Context取消调用。请注意,必须始终取消 CancellableContext,这就是 finally 块的用途。

CancellableContext withCancellation = Context.current().withCancellation();
try {
  withCancellation.run(() -> {
      stub.streamFooUpdates(...);
      Thread.sleep(10000);
      withCancellation.cancel(null);
  });
} finally {
  withCancellation.cancel(null);
}