如何在不使用额外状态变量的情况下跟踪请求 issued/completed? (Java/Grpc)
How do I keep track of requests issued/completed without the use of additional state variables? (Java/Grpc)
我正在使用 grpc-java 项目中的 StreamObserver class 来设置一些双向流。
当我 运行 我的程序时,我向服务器发出了不确定数量的请求,我只想在完成所有请求后调用 requestObserver 上的 onCompleted()。
目前,为了解决这个问题,我使用变量 "inFlight" 来跟踪已发出的请求,当响应返回时,我递减 "inFlight"。所以,像这样。
// issuing requests
while (haveRequests) {
MessageRequest request = mkRequest();
this.requestObserver.onNext(request);
this.inFlight++;
}
// response observer
StreamObserver<Message> responseObserver = new StreamObserver<Message> {
@Override
public void onNext(Message response) {
if (--this.onFlight == 0) {
this.requestObserver.onCompleted();
}
// work on message
}
// other methods
}
有点伪代码,但这个逻辑有效。但是,如果可能的话,我想去掉 "inFlight" 变量。 StreamObserver class 中是否有任何允许这种功能的东西,而不需要额外的变量来跟踪状态?会告诉发出的请求数量和完成时间的东西。
我已经尝试在 intellij IDE 调试器中检查该对象,但没有弹出任何内容。
要回答您的直接问题,您只需从 while 循环中调用 onComplete 即可。传递给 onNext
的所有消息。在底层,gRPC 将发送所谓的 "half close",表明它不会再发送任何消息,但愿意接收它们。具体来说:
// issuing requests
while (haveRequests) {
MessageRequest request = mkRequest();
this.requestObserver.onNext(request);
this.inFlight++;
}
requestObserver.onCompleted();
这可以确保所有回复都按照您发送的顺序发送。在服务器端,当它看到相应的 onCompleted
回调时,它可以通过在其观察者上调用 onComplete 来半关闭其连接端。 (服务器端有两个观察者,一个用于从客户端接收信息,一个用于发送信息)。
回到客户端,只需要等待服务器半关闭就可以知道所有的消息都被接收和处理了。请注意,如果有任何错误,您将收到 onError
回调。
如果您不知道您将在客户端发出多少请求,您可以考虑使用AtomicInteger
,并调用decrementAndGet
当你得到回复时。如果 return 值为 0
,您将知道所有请求已完成。
我正在使用 grpc-java 项目中的 StreamObserver class 来设置一些双向流。
当我 运行 我的程序时,我向服务器发出了不确定数量的请求,我只想在完成所有请求后调用 requestObserver 上的 onCompleted()。
目前,为了解决这个问题,我使用变量 "inFlight" 来跟踪已发出的请求,当响应返回时,我递减 "inFlight"。所以,像这样。
// issuing requests
while (haveRequests) {
MessageRequest request = mkRequest();
this.requestObserver.onNext(request);
this.inFlight++;
}
// response observer
StreamObserver<Message> responseObserver = new StreamObserver<Message> {
@Override
public void onNext(Message response) {
if (--this.onFlight == 0) {
this.requestObserver.onCompleted();
}
// work on message
}
// other methods
}
有点伪代码,但这个逻辑有效。但是,如果可能的话,我想去掉 "inFlight" 变量。 StreamObserver class 中是否有任何允许这种功能的东西,而不需要额外的变量来跟踪状态?会告诉发出的请求数量和完成时间的东西。
我已经尝试在 intellij IDE 调试器中检查该对象,但没有弹出任何内容。
要回答您的直接问题,您只需从 while 循环中调用 onComplete 即可。传递给 onNext
的所有消息。在底层,gRPC 将发送所谓的 "half close",表明它不会再发送任何消息,但愿意接收它们。具体来说:
// issuing requests
while (haveRequests) {
MessageRequest request = mkRequest();
this.requestObserver.onNext(request);
this.inFlight++;
}
requestObserver.onCompleted();
这可以确保所有回复都按照您发送的顺序发送。在服务器端,当它看到相应的 onCompleted
回调时,它可以通过在其观察者上调用 onComplete 来半关闭其连接端。 (服务器端有两个观察者,一个用于从客户端接收信息,一个用于发送信息)。
回到客户端,只需要等待服务器半关闭就可以知道所有的消息都被接收和处理了。请注意,如果有任何错误,您将收到 onError
回调。
如果您不知道您将在客户端发出多少请求,您可以考虑使用AtomicInteger
,并调用decrementAndGet
当你得到回复时。如果 return 值为 0
,您将知道所有请求已完成。