gRPC 服务器如何注意到客户端取消了服务器端流调用?
How can a gRPC server notice that the client has cancelled a server-side streaming call?
我想使用 gRPC 让客户端订阅服务器生成的事件。我有一个这样声明的 RPC:
rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);
其中返回的流是无限的。 "unsubscribe",客户端取消 RPC(顺便说一下,有没有更简洁的方法?)。
我已经弄明白客户端取消调用的方法了:
Context.CancellableContext cancellableContext =
Context.current().withCancellation();
cancellableContext.run(() -> {
stub.subscribe(request, callback);
});
// do other stuff / wait for reason to unsubscribe
cancellableContext.cancel(new InterruptedException());
但是,服务器似乎没有注意到客户端已取消其调用。我正在使用虚拟服务器实现对此进行测试:
@Override
public void subscribe(SubscribeRequest request,
StreamObserver<SubscribeResponse> responseObserver) {
// in real code, this will happen in a separate thread.
while (!Thread.interrupted()) {
responseObserver.onNext(SubscribeResponse.getDefaultInstance());
}
}
服务器将愉快地继续将其消息发送到以太网络中。服务器如何识别调用已被客户端取消并因此停止发送响应?
我自己找到了答案。您将传递给订阅的 StreamObserver
转换为 ServerCallStreamObserver
,它公开方法 isCancelled
和 setOnCancelHandler
.
scso = ((ServerCallStreamObserver<SubscribeResponse>) responseObserver);
scso.setOnCancelHandler(handler);
// or
if (scso.isCancelled()) {
// do whatever
}
对我来说,这引出了为什么 subscribe
没有通过 ServerCallStreamObserver
开头的问题。
我想使用 gRPC 让客户端订阅服务器生成的事件。我有一个这样声明的 RPC:
rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);
其中返回的流是无限的。 "unsubscribe",客户端取消 RPC(顺便说一下,有没有更简洁的方法?)。
我已经弄明白客户端取消调用的方法了:
Context.CancellableContext cancellableContext =
Context.current().withCancellation();
cancellableContext.run(() -> {
stub.subscribe(request, callback);
});
// do other stuff / wait for reason to unsubscribe
cancellableContext.cancel(new InterruptedException());
但是,服务器似乎没有注意到客户端已取消其调用。我正在使用虚拟服务器实现对此进行测试:
@Override
public void subscribe(SubscribeRequest request,
StreamObserver<SubscribeResponse> responseObserver) {
// in real code, this will happen in a separate thread.
while (!Thread.interrupted()) {
responseObserver.onNext(SubscribeResponse.getDefaultInstance());
}
}
服务器将愉快地继续将其消息发送到以太网络中。服务器如何识别调用已被客户端取消并因此停止发送响应?
我自己找到了答案。您将传递给订阅的 StreamObserver
转换为 ServerCallStreamObserver
,它公开方法 isCancelled
和 setOnCancelHandler
.
scso = ((ServerCallStreamObserver<SubscribeResponse>) responseObserver);
scso.setOnCancelHandler(handler);
// or
if (scso.isCancelled()) {
// do whatever
}
对我来说,这引出了为什么 subscribe
没有通过 ServerCallStreamObserver
开头的问题。