当客户端取消服务调用时,不会触发 gRPC 服务的 Context CancellationListener
gRPC services's Context CancellationListener is not fired when client cancels a service call
我有一个流媒体服务,可以无限期地从服务器流式传输到客户端,直到客户端取消。
在服务器端,我有一个线程使用来自数据库的数据填充 ehcache。
Ehcache 提供对缓存事件的回调,即添加项目时、删除项目时等。我只关心在将元素放入缓存时通知客户端,因此当客户端连接到我的gRPC 服务,我在缓存中注册了一个 notifyElementPut()
回调,它引用了已连接的客户端 StreamObserver
:
public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {
private StreamObserver<FooUpdateResponse> responseObserver;
public GrpcAwareCacheEventListener(
StreamObserver<FooUpdateResponse> responseObserver) {
this.responseObserver = responseObserver;
}
@Override
public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
Foo foo = (Foo) element.getObjectValue();
if (foo != null) {
responseObserver.onNext(
FooResponse.newBuilder().setFoo(foo).build());
}
}
}
我的流媒体foo服务如下:
public void streamFooUpdates(Empty request,
StreamObserver<FooResponse> responseObserver) {
final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
fooCache.getCacheEventNotificationService().registerListener(eventListener);
Context.current().withCancellation().addListener(new CancellationListener() {
public void cancelled(Context context) {
log.info("inside context cancelled callback");
fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
}
}, ForkJoinPool.commonPool());
}
一切正常,只要连接上,客户端就会收到所有 foo 更新的通知。
但是,在客户端断开连接或明确取消调用后,我预计服务器的上下文取消侦听器会触发,从而取消注册缓存的回调。
不是这样的,不管客户端是关闭通道,还是显式取消调用。 (我希望服务器端取消的上下文会为这两个事件触发)。我想知道我在客户端的取消语义是否不正确,这是我的客户端代码,取自测试用例:
Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
.usePlaintext().build();
FooServiceGrpc.FooService stub = FooServiceGrpc
.newStub(channel);
ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
public void onNext(FooResponse response) {
log.info("received foo: {}", response.getFoo());
}
public void onError(Throwable throwable) {
}
public void onCompleted() {
}
public boolean isReady() {
return false;
}
public void setOnReadyHandler(Runnable runnable) {
}
public void disableAutoInboundFlowControl() {
}
public void request(int i) {
}
public void setMessageCompression(boolean b) {
}
public void cancel(@Nullable String s, @Nullable Throwable throwable) {
}
};
stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
Thread.sleep(10000); // sleep 10 seconds while messages are received.
cancellableObserver.cancel("cancelling from test", null); //explicit cancel
((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.
Thread.sleep(7000); //channel should be shutdown by now.
}
我想知道为什么服务器没有触发 "Context cancelled" 回调。
谢谢!
您没有正确取消客户端调用。 stub.streamFooUpdates()
的第二个参数上的 StreamObserver
是您的回调。你不应该在 StreamObserver
.
上调用任何东西
有两种方法可以从客户端取消调用。
选项 1:传递一个 ClientResponseObserver
作为第二个参数,实现 beforeStart()
,它给你一个 ClientCallStreamObserver
,你可以调用 cancel()
.
方案二:运行stub.streamFooUpdates()
里面一个CancellableContext
,取消Context
取消调用。请注意,必须始终取消 CancellableContext
,这就是 finally
块的用途。
CancellableContext withCancellation = Context.current().withCancellation();
try {
withCancellation.run(() -> {
stub.streamFooUpdates(...);
Thread.sleep(10000);
withCancellation.cancel(null);
});
} finally {
withCancellation.cancel(null);
}
我有一个流媒体服务,可以无限期地从服务器流式传输到客户端,直到客户端取消。
在服务器端,我有一个线程使用来自数据库的数据填充 ehcache。
Ehcache 提供对缓存事件的回调,即添加项目时、删除项目时等。我只关心在将元素放入缓存时通知客户端,因此当客户端连接到我的gRPC 服务,我在缓存中注册了一个 notifyElementPut()
回调,它引用了已连接的客户端 StreamObserver
:
public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {
private StreamObserver<FooUpdateResponse> responseObserver;
public GrpcAwareCacheEventListener(
StreamObserver<FooUpdateResponse> responseObserver) {
this.responseObserver = responseObserver;
}
@Override
public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
Foo foo = (Foo) element.getObjectValue();
if (foo != null) {
responseObserver.onNext(
FooResponse.newBuilder().setFoo(foo).build());
}
}
}
我的流媒体foo服务如下:
public void streamFooUpdates(Empty request,
StreamObserver<FooResponse> responseObserver) {
final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
fooCache.getCacheEventNotificationService().registerListener(eventListener);
Context.current().withCancellation().addListener(new CancellationListener() {
public void cancelled(Context context) {
log.info("inside context cancelled callback");
fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
}
}, ForkJoinPool.commonPool());
}
一切正常,只要连接上,客户端就会收到所有 foo 更新的通知。
但是,在客户端断开连接或明确取消调用后,我预计服务器的上下文取消侦听器会触发,从而取消注册缓存的回调。
不是这样的,不管客户端是关闭通道,还是显式取消调用。 (我希望服务器端取消的上下文会为这两个事件触发)。我想知道我在客户端的取消语义是否不正确,这是我的客户端代码,取自测试用例:
Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
.usePlaintext().build();
FooServiceGrpc.FooService stub = FooServiceGrpc
.newStub(channel);
ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
public void onNext(FooResponse response) {
log.info("received foo: {}", response.getFoo());
}
public void onError(Throwable throwable) {
}
public void onCompleted() {
}
public boolean isReady() {
return false;
}
public void setOnReadyHandler(Runnable runnable) {
}
public void disableAutoInboundFlowControl() {
}
public void request(int i) {
}
public void setMessageCompression(boolean b) {
}
public void cancel(@Nullable String s, @Nullable Throwable throwable) {
}
};
stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
Thread.sleep(10000); // sleep 10 seconds while messages are received.
cancellableObserver.cancel("cancelling from test", null); //explicit cancel
((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.
Thread.sleep(7000); //channel should be shutdown by now.
}
我想知道为什么服务器没有触发 "Context cancelled" 回调。
谢谢!
您没有正确取消客户端调用。 stub.streamFooUpdates()
的第二个参数上的 StreamObserver
是您的回调。你不应该在 StreamObserver
.
有两种方法可以从客户端取消调用。
选项 1:传递一个 ClientResponseObserver
作为第二个参数,实现 beforeStart()
,它给你一个 ClientCallStreamObserver
,你可以调用 cancel()
.
方案二:运行stub.streamFooUpdates()
里面一个CancellableContext
,取消Context
取消调用。请注意,必须始终取消 CancellableContext
,这就是 finally
块的用途。
CancellableContext withCancellation = Context.current().withCancellation();
try {
withCancellation.run(() -> {
stub.streamFooUpdates(...);
Thread.sleep(10000);
withCancellation.cancel(null);
});
} finally {
withCancellation.cancel(null);
}