gRPC 服务器如何注意到客户端取消了服务器端流调用?

How can a gRPC server notice that the client has cancelled a server-side streaming call?

我想使用 gRPC 让客户端订阅服务器生成的事件。我有一个这样声明的 RPC:

rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);

其中返回的流是无限的。 "unsubscribe",客户端取消 RPC(顺便说一下,有没有更简洁的方法?)。

我已经弄明白客户端取消调用的方法了:

Context.CancellableContext cancellableContext =
         Context.current().withCancellation();
cancellableContext.run(() -> {
   stub.subscribe(request, callback);
});
// do other stuff / wait for reason to unsubscribe
cancellableContext.cancel(new InterruptedException());

但是,服务器似乎没有注意到客户端已取消其调用。我正在使用虚拟服务器实现对此进行测试:

@Override
public void subscribe(SubscribeRequest request,
                      StreamObserver<SubscribeResponse> responseObserver) {
  // in real code, this will happen in a separate thread.
  while (!Thread.interrupted()) {
    responseObserver.onNext(SubscribeResponse.getDefaultInstance());
  }
}

服务器将愉快地继续将其消息发送到以太网络中。服务器如何识别调用已被客户端取消并因此停止发送响应?

我自己找到了答案。您将传递给订阅的 StreamObserver 转换为 ServerCallStreamObserver,它公开方法 isCancelledsetOnCancelHandler.

scso = ((ServerCallStreamObserver<SubscribeResponse>) responseObserver);

scso.setOnCancelHandler(handler);
// or
if (scso.isCancelled()) {
  // do whatever
}

对我来说,这引出了为什么 subscribe 没有通过 ServerCallStreamObserver 开头的问题。