如何在不使用额外状态变量的情况下跟踪请求 issued/completed? (Java/Grpc)

How do I keep track of requests issued/completed without the use of additional state variables? (Java/Grpc)

我正在使用 grpc-java 项目中的 StreamObserver class 来设置一些双向流。

当我 运行 我的程序时,我向服务器发出了不确定数量的请求,我只想在完成所有请求后调用 requestObserver 上的 onCompleted()。

目前,为了解决这个问题,我使用变量 "inFlight" 来跟踪已发出的请求,当响应返回时,我递减 "inFlight"。所以,像这样。

// issuing requests
while (haveRequests) {
    MessageRequest request = mkRequest();
    this.requestObserver.onNext(request);
    this.inFlight++; 
}


// response observer
StreamObserver<Message> responseObserver = new StreamObserver<Message> {

    @Override
    public void onNext(Message response) {
        if (--this.onFlight == 0) {
            this.requestObserver.onCompleted();
        }

        // work on message
    }

    // other methods

}

有点伪代码,但这个逻辑有效。但是,如果可能的话,我想去掉 "inFlight" 变量。 StreamObserver class 中是否有任何允许这种功能的东西,而不需要额外的变量来跟踪状态?会告诉发出的请求数量和完成时间的东西。

我已经尝试在 intellij IDE 调试器中检查该对象,但没有弹出任何内容。

要回答您的直接问题,您只需从 while 循环中调用 onComplete 即可。传递给 onNext 的所有消息。在底层,gRPC 将发送所谓的 "half close",表明它不会再发送任何消息,但愿意接收它们。具体来说:

// issuing requests
while (haveRequests) {
    MessageRequest request = mkRequest();
    this.requestObserver.onNext(request);
    this.inFlight++; 
}
requestObserver.onCompleted();

这可以确保所有回复都按照您发送的顺序发送。在服务器端,当它看到相应的 onCompleted 回调时,它可以通过在其观察者上调用 onComplete 来半关闭其连接端。 (服务器端有两个观察者,一个用于从客户端接收信息,一个用于发送信息)。

回到客户端,只需要等待服务器半关闭就可以知道所有的消息都被接收和处理了。请注意,如果有任何错误,您将收到 onError 回调。

如果您知道您将在客户端发出多少请求,您可以考虑使用AtomicInteger,并调用decrementAndGet 当你得到回复时。如果 return 值为 0,您将知道所有请求已完成。