gRPC "call is closed" 在服务器上调用 onNext 时出现异常

gRPC "call is closed" exception when calling onNext on server

不幸的是,这在生产中偶尔会发生,但我无法可靠地重现。

gRPC 服务器向少数客户端分发少量但频繁的更新。每个客户端使用不同的参数对同一个调用发出多个请求。这是永久流式数据,服务器永远不会有 onComplete。

调用 onNext 时,出现以下错误:

Feb 20, 2019 10:13:03 AM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall$$Lambda/1901113624@2b4ca8e3
java.lang.IllegalStateException: call is closed
at com.google.common.base.Preconditions.checkState(Preconditions.java:174)
at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:124)
at io.grpc.ForwardingServerCall.sendMessage(ForwardingServerCall.java:32)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.access01(UncaughtExceptionServerInterceptor.java:142)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.lambda$sendMessage[=11=](UncaughtExceptionServerInterceptor.java:158)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.sendMessage(UncaughtExceptionServerInterceptor.java:158)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
at myPackage$helper.lambda$calculateChangesAndNotifyObservers(myCode.java:229)

根据 gRPC 订阅数据的请求保留观察者列表。多个客户端可能会请求完全相同的更新,因此每个进来的客户端都会被添加到相应数据的列表中。

如果他们取消,他们将通过以下方式从列表中删除:

if( responseObserver instanceof ServerCallStreamObserver<?> )
{
    ((ServerCallStreamObserver<?>) responseObserver).setOnCancelHandler( () ->
    {
        synchronized( _lastSnapshot )
        {
            _observers.remove( responseObserver );
        }                       
    } );
}

捕获 onNext 抛出的异常并简单地从列表中删除客户端是否明智?或者有没有更好的检测方法?或者是否有我需要解决的潜在问题?

我今天也遇到了这个错误。 因为我在调用 build() 后更改了构建器值。 错误:

SyncAepUserResponse.Builder builder = SyncAepUserResponse.newBuilder();
builder.setStatus(Status.newBuilder().setCode(200).setMessage("Successful"))
                    .setMapId(entity.getId()).build();
builder.setCode("200");
responseObserver.onNext(response);
responseObserver.onCompleted();

对:

SyncAepUserResponse.Builder builder = SyncAepUserResponse.newBuilder();
builder.setCode("200");
builder.setStatus(Status.newBuilder().setCode(200).setMessage("Successful"))
                    .setMapId(entity.getId()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();