ClientCallStreamObserver isReady 从不 returns true
ClientCallStreamObserver isReady never returns true
我正在制作一个输入流速率计。它基本上是一种公开请求流调用并计算每秒可处理的消息数的服务。
由于客户端在发送消息时是完全异步的,我使用 ClientCallStreamObserver 在流就绪时开始发送消息,以避免内存溢出。
客户端代码如下所示:
public static void main(String[] args) throws Exception {
ManagedChannel channel = ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build();
ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel);
StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() {
@Override
public void onNext(Empty empty) {
}
@Override
public void onError(Throwable throwable) {
logger.info("on error response stream");
}
@Override
public void onCompleted() {
logger.info("on completed response stream");
}
});
final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream;
while (!clientCallObserver.isReady()) {
Thread.sleep(2000);
logger.info("stream not ready yet");
}
counter.setLastTic(System.nanoTime());
while (true) {
counter.inc();
if (counter.getCounter() % 15000 == 0 ) {
long now = System.nanoTime();
double rate = (double) NANOSEC_TO_SEC * counter.getCounter() / (now - counter.getLastTic());
logger.info("rate: " + rate + " msgs per sec");
counter.clear();
counter.setLastTic(now);
}
inputStream.onNext(createRandomTrade());
}
}
我对 isReady 的观察循环永无止境。
OBS:我正在使用 kubernetes 集群来为我的测试服务,服务器正在接收调用并返回一个 StreamObserver 实现。
isReady
最终应该 return true
,只要 RPC 没有立即 error/complete。但是代码没有正确观察流量控制。
每次调用 onNext()
发送请求后 isReady()
可以开始 returning false
。您的 while (true)
循环应该在每次迭代开始时进行 isReady()
检查。
与其轮询,不如调用 serverCallObserver.setOnReadyHandler(yourRunnable)
以便在调用准备好发送时得到通知。请注意,您仍应在 yourRunnable
内检查 isReady()
,因为可能会有 spurious/out-of-date 条通知。
我正在制作一个输入流速率计。它基本上是一种公开请求流调用并计算每秒可处理的消息数的服务。
由于客户端在发送消息时是完全异步的,我使用 ClientCallStreamObserver 在流就绪时开始发送消息,以避免内存溢出。
客户端代码如下所示:
public static void main(String[] args) throws Exception {
ManagedChannel channel = ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build();
ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel);
StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() {
@Override
public void onNext(Empty empty) {
}
@Override
public void onError(Throwable throwable) {
logger.info("on error response stream");
}
@Override
public void onCompleted() {
logger.info("on completed response stream");
}
});
final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream;
while (!clientCallObserver.isReady()) {
Thread.sleep(2000);
logger.info("stream not ready yet");
}
counter.setLastTic(System.nanoTime());
while (true) {
counter.inc();
if (counter.getCounter() % 15000 == 0 ) {
long now = System.nanoTime();
double rate = (double) NANOSEC_TO_SEC * counter.getCounter() / (now - counter.getLastTic());
logger.info("rate: " + rate + " msgs per sec");
counter.clear();
counter.setLastTic(now);
}
inputStream.onNext(createRandomTrade());
}
}
我对 isReady 的观察循环永无止境。
OBS:我正在使用 kubernetes 集群来为我的测试服务,服务器正在接收调用并返回一个 StreamObserver 实现。
isReady
最终应该 return true
,只要 RPC 没有立即 error/complete。但是代码没有正确观察流量控制。
每次调用 onNext()
发送请求后 isReady()
可以开始 returning false
。您的 while (true)
循环应该在每次迭代开始时进行 isReady()
检查。
与其轮询,不如调用 serverCallObserver.setOnReadyHandler(yourRunnable)
以便在调用准备好发送时得到通知。请注意,您仍应在 yourRunnable
内检查 isReady()
,因为可能会有 spurious/out-of-date 条通知。