Vertx GRPC 流观察器

Vertx GRPC Stream Observer

我已经使用 vertx 实现了 GRPC 双向流媒体服务。从服务器端收到响应后,我必须根据响应执行另一个过程。为此,我使用了 requestStreamObserver.onCompleted(),但是在这个调用之后,我得到了

Stream is already completed, no further calls are allowed.

从服务器端。

所以我的问题是如何在不终止服务器流的情况下读取响应流并执行该过程

原型文件

syntax = "prto3";

service userService{
    rpc getData(stream Request) returns(stream Response);
}

message Request{
    int32 id = 1;
}

message Response{
    int32 responseCode = 1;
    string username = 2;
}

客户端实现

public void bidi() {

    List<String> responseList = new ArrayList<>();

    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<Response> responseStreamObserver = new StreamObserver<Response>() {
        @Override
        public void onNext(Response response) {
            System.out.println(response.getResponseCode() + " - " + response.getUsername());
            responseList.add(response.getUsername());
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("Error " + throwable.getMessage());
            finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
            System.out.println("completed stream");
            finishLatch.countDown();
        }
    };

    StreamObserver<Request> requestStreamObserver = nonBlockingStub.requestData(responseStreamObserver);

    try {
        for (int i = 1; i < 6; i++) {
            Request request = Request.newBuilder().setId(i).build();
            requestStreamObserver.onNext(request);
            if (finishLatch.getCount() == 0) {
                return;
            }
        }            
    } catch (Exception e) {
        e.printStackTrace();
    }

    requestStreamObserver.onCompleted();
    try {
        finishLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //These code blocks need to be executed after completing the response
    for(String data:responseList){
        System.out.println(data);
    }
    doStuff();
}

因为我无权访问服务器端,所以我无法 post 服务器端实现,但是请求正在接受流和 returns 来自服务器的流。

谢谢。

这就是我解决这个问题的方法。

使用了AtomicInteger并在发出请求时增加了计数。然后作为响应,观察者必须在收到响应后递减计数器。如果计数器为 0,则调用 CountdownLatchcountDown() 并执行其他操作。

    public void bidi() {

    List<String> responseList = new ArrayList<>();
    AtomicInteger counter = new AtomicInteger();

    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<Response> responseStreamObserver = new StreamObserver<Response>() {
        @Override
        public void onNext(Response response) {
            System.out.println(response.getResponseCode() + " - " + response.getUsername());
            responseList.add(response.getUsername());
            counter.decrementAndGet(); //decrement the counter
            if(counter.get() == 0){
                finishLatch.countDown();
            }
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("Error " + throwable.getMessage());
            finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
            System.out.println("completed stream");
            finishLatch.countDown();
        }
    };

    StreamObserver<Request> requestStreamObserver = nonBlockingStub.requestData(responseStreamObserver);

    try {
        for (int i = 1; i < 6; i++) {
            Request request = Request.newBuilder().setId(i).build();
            requestStreamObserver.onNext(request);
            counter.incrementAndGet();//increment the counter
            if (finishLatch.getCount() == 0) {
                return;
            }
        }            
    } catch (Exception e) {
        e.printStackTrace();
    }

    try {
        finishLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    for(String data:responseList){
        System.out.println(data);
    }
    doStuff();
}