gRPC 流被取消

gRPC stream become canceled

我正在尝试通过 gRPC 创建流 API,但我的 StreamObserver 由于某些奇怪的原因被取消了。 这是我的 .proto 声明:

service Service {
    rpc connect (ConnectionRequest) returns (stream StreamResponse) {}

    rpc act (stream ActRequest) returns (ActResponse) {}
}

想法是用户将 connect 所以 StreamResponse 将为每个用户保存,然后在每个 actStreamResponse 将收到更新.这是 Service

的 java 实现
class ServiceImpl extends ServiceGrpc.ServiceImplBase {
    ............
    @Override
    public void connect(ConnectionRequest request, StreamObserver<StreamResponse> responseObserver) {
        observers.add(responseObserver);
    }

    @Override
    public StreamObserver<ActRequest> act(StreamObserver<ActResponse> responseObserver) {
        return return new StreamObserver<ActRequest>() {
            @Override
            public void onNext(ActRequest actRequest) {
                StreamResponse streamResponse = StreamResponse.newBuilder().build();
                observers.forEach(o -> o.onNext(streamResponse));
                actRequest..onCompleted();
            }
            ...............
        };
    }
    ..................
}

这是客户端 connect 方法:

ServiceGrpc.ServiceStub stub = ServiceGrpc.newStub(channel);
new StreamObserver<PlayerResponse>() {
    ........
    @Override
    public void onNext(StreamResponse streamResponse) {
          logger.info("Act: " streamResponse.getData());
    }
    ........
}
ConnectionRequest request = ConnectionRequest.newBuilder().build();
stub.connect(request, streamObserver);

和客户端act方法:

StreamObserver<ActResponse> observer = new StreamObserver<ActResponse>() {
    @Override
    public void onNext(ActResponse actResponse) {
        logger.info("Status: " + actResponse.getSuccess());
    }
    .........
};
StreamObserver<ActRequest> act = stub.act(observer);
act.onNext(MoveRequest.newBuilder().build());
act.onCompleted();

当我同时启动客户端和服务器时,客户端可以调用connect。但是方法act只能第一次调用。当客户端第二次调用 act 时,我收到以下异常:

io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
    at io.grpc.Status.asRuntimeException(Status.java:517)
    at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:335)

在调试模式下我可以看到 StreamObserver<StreamResponse> responseObservercanceled=true

我找到了为什么会出现这个错误。在客户端中,我使用 Vaadin 构建 UI。所以在 StreamObserver<ActResponse>#onNext 上我调用了一个抛出异常的 Vaadin 方法。

我在开发时犯的错误是我将 onError 留空,因为它是一个 PoC。但是这个错误让我调试了几个小时。 这是一个愚蠢的错误,但我会把它放在这里,也许它可以为某人节省一些时间。