如何管理 gRPC-java 服务器方法中的阻塞代码?
How can I manage blocking code inside gRPC-java server method?
假设我有一个 grpc-java 服务器,代码如下:
@Override
public void getData(RequestValue requestValue, StreamObserver<ResponseValue>responseObserver) {
ResponseValue rv = ... // blocking code here
responseObserver.onNext(rv);
responseObserver.onCompleted();
}
因此,由于阻塞代码(来自数据库或其他服务的数据),我有一个 responseValue。
我想避免使用另一个线程池来阻塞当前线程来执行阻塞任务。例如,在 Netty 中,我可以使用特定的 EventExecutorGroup 来完成此类任务。
如何使用 grpc-java 服务正确管理它?
最简单的方法是将 responseObserver 传递给长 运行 任务:
@Override
public void getData(RequestValue requestValue, StreamObserver<ResponseValue> responseObserver) {
Runnable r = () -> {
try {
ResponseValue rv = ... // blocking code here
responseObserver.onNext(rv);
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
executor.schedule(r);
}
请务必在某个时间完成通话,即使出现意外错误也是如此。否则你会泄漏调用(在超时发生之前保持打开状态,如果有的话)。
假设我有一个 grpc-java 服务器,代码如下:
@Override
public void getData(RequestValue requestValue, StreamObserver<ResponseValue>responseObserver) {
ResponseValue rv = ... // blocking code here
responseObserver.onNext(rv);
responseObserver.onCompleted();
}
因此,由于阻塞代码(来自数据库或其他服务的数据),我有一个 responseValue。
我想避免使用另一个线程池来阻塞当前线程来执行阻塞任务。例如,在 Netty 中,我可以使用特定的 EventExecutorGroup 来完成此类任务。
如何使用 grpc-java 服务正确管理它?
最简单的方法是将 responseObserver 传递给长 运行 任务:
@Override
public void getData(RequestValue requestValue, StreamObserver<ResponseValue> responseObserver) {
Runnable r = () -> {
try {
ResponseValue rv = ... // blocking code here
responseObserver.onNext(rv);
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
executor.schedule(r);
}
请务必在某个时间完成通话,即使出现意外错误也是如此。否则你会泄漏调用(在超时发生之前保持打开状态,如果有的话)。