与 StreamObserver 异步交互是否安全,即使用 Java 8+ CompletableFutures?
Is it safe to interact with StreamObserver asynchronously, i.e. with Java 8+ CompletableFutures?
我正在查看 Simple RPC example from grpc.io's basic tutorial:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
从其他线程与 StreamObserver 交互有什么注意事项吗? 例如,说 checkFeature()
异步命中另一个服务,返回 CompletableFuture:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
thenAccept(feature -> responseObserver.onNext(feature));
responseObserver.onCompleted();
}
当然,上面的方法是行不通的,因为第一个线程会在返回功能之前执行 onCompleted()
。所以让我们解决这个问题:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
thenAccept(feature -> {
responseObserver.onNext(feature);
responseObserver.onCompleted();
});
}
我认为这应该可行,但我是 Java 的新手,所以我想知道会有什么后果。例如,
Context.current()
会一致吗?
- 除了
onNext()
一元调用和 onError()
之外,还有什么会导致 StreamObserver 过早销毁或关闭吗?
- 有更好的做法吗?
如果有人也能指导我了解他们的推理方式,那就太好了。我尝试查找 StreamObserver
的实际实现,但我不确定要查找什么。
Will Context.current() be consistent?
Context.current()
正在使用 ThreadLocal
。如果您在不同的线程上访问它,它将不一致。您可以在线程之间传播上下文。您可能会发现 有用。
Will anything cause the StreamObserver to destruct or close prematurely besides onNext() for a unary calls and onError()?
是的,StreamObserver 的正常流程以 onError 或 onCompleted 结束。
如 StreamObserver
javadoc 所述,"Since individual StreamObservers are not thread-safe, if multiple threads will be writing to a StreamObserver concurrently, the application must synchronize calls"。如果您并发调用 StreamObserver,则需要同步调用。换句话说,如果你确定即使你使用多线程也不会并发调用,那应该没问题。
如果在多个线程上访问同一个 StreamObserver,我会尝试同步它,除非性能很关键,因为它很容易出错。至少,它值得好评。
用thenAccept()
调用onNext()
和onCompleted()
就可以了,因为观察者不是多线程并发调用的
单独调用 onCompleted()
的 "broken" 示例也被破坏,因为它可以在没有任何形式的同步的情况下从多个线程调用观察者。 StreamObservers
不能同时从多个线程调用。
虽然使用 thenAccept()
不太正确,因为它不能处理 future 失败的情况。所以你还需要接收 Throwable
,这可以用 whenComplete()
:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
whenComplete((feature, t) -> {
if (t != null) {
responseObserver.onError(t);
} else {
responseObserver.onNext(feature);
responseObserver.onCompleted();
}
});
}
处理该 lambda 时,上下文很容易成为 "wrong"。通常我们会寻找 "architectural" 解决方案来确保传播上下文,例如在创建它们时将所有应用程序线程池包装在 Context.currentContextExecutor()
中,因此各个调用站点不需要关心传播。我对CompletableFuture
不够熟悉,无法提供策略。
我正在查看 Simple RPC example from grpc.io's basic tutorial:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
从其他线程与 StreamObserver 交互有什么注意事项吗? 例如,说 checkFeature()
异步命中另一个服务,返回 CompletableFuture:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
thenAccept(feature -> responseObserver.onNext(feature));
responseObserver.onCompleted();
}
当然,上面的方法是行不通的,因为第一个线程会在返回功能之前执行 onCompleted()
。所以让我们解决这个问题:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
thenAccept(feature -> {
responseObserver.onNext(feature);
responseObserver.onCompleted();
});
}
我认为这应该可行,但我是 Java 的新手,所以我想知道会有什么后果。例如,
Context.current()
会一致吗?- 除了
onNext()
一元调用和onError()
之外,还有什么会导致 StreamObserver 过早销毁或关闭吗? - 有更好的做法吗?
如果有人也能指导我了解他们的推理方式,那就太好了。我尝试查找 StreamObserver
的实际实现,但我不确定要查找什么。
Will Context.current() be consistent?
Context.current()
正在使用 ThreadLocal
。如果您在不同的线程上访问它,它将不一致。您可以在线程之间传播上下文。您可能会发现
Will anything cause the StreamObserver to destruct or close prematurely besides onNext() for a unary calls and onError()?
是的,StreamObserver 的正常流程以 onError 或 onCompleted 结束。
如 StreamObserver
javadoc 所述,"Since individual StreamObservers are not thread-safe, if multiple threads will be writing to a StreamObserver concurrently, the application must synchronize calls"。如果您并发调用 StreamObserver,则需要同步调用。换句话说,如果你确定即使你使用多线程也不会并发调用,那应该没问题。
如果在多个线程上访问同一个 StreamObserver,我会尝试同步它,除非性能很关键,因为它很容易出错。至少,它值得好评。
用thenAccept()
调用onNext()
和onCompleted()
就可以了,因为观察者不是多线程并发调用的
单独调用 onCompleted()
的 "broken" 示例也被破坏,因为它可以在没有任何形式的同步的情况下从多个线程调用观察者。 StreamObservers
不能同时从多个线程调用。
虽然使用 thenAccept()
不太正确,因为它不能处理 future 失败的情况。所以你还需要接收 Throwable
,这可以用 whenComplete()
:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
whenComplete((feature, t) -> {
if (t != null) {
responseObserver.onError(t);
} else {
responseObserver.onNext(feature);
responseObserver.onCompleted();
}
});
}
处理该 lambda 时,上下文很容易成为 "wrong"。通常我们会寻找 "architectural" 解决方案来确保传播上下文,例如在创建它们时将所有应用程序线程池包装在 Context.currentContextExecutor()
中,因此各个调用站点不需要关心传播。我对CompletableFuture
不够熟悉,无法提供策略。