如果没有服务器,GRPC 客户端 onNext 不会失败
GRPC client onNext does not fail if there is no server
我有一个简单的 gRPC 客户端如下:
/**
* Client that calls gRPC.
*/
public class Client {
private static final Context.Key<String> URI_CONTEXT_KEY =
Context.key(Constants.URI_HEADER_KEY);
private final ManagedChannel channel;
private final DoloresRPCStub asyncStub;
/**
* Construct client for accessing gRPC server at {@code host:port}.
* @param host
* @param port
*/
public Client(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true));
}
/**
* Construct client for accessing gRPC server using the existing channel.
* @param channelBuilder {@link ManagedChannelBuilder} instance
*/
public Client(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
asyncStub = DoloresRPCGrpc.newStub(channel);
}
/**
* Closes the client
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* Main async method for communication between client and server
* @param responseObserver user's {@link StreamObserver} implementation to handle
* responses received from the server.
* @return {@link StreamObserver} instance to provide requests into
*/
public StreamObserver<Request> downloading(StreamObserver<Response> responseObserver) {
return asyncStub.downloading(responseObserver);
}
public static void main(String[] args) {
Client cl = new Client("localhost", 8999); // fail??
StreamObserver<Request> requester = cl.downloading(new StreamObserver<Response>() {
@Override
public void onNext(Response value) {
System.out.println("On Next");
}
@Override
public void onError(Throwable t) {
System.out.println("Error");
}
@Override
public void onCompleted() {
System.out.println("Completed");
}
}); // fail ??
System.out.println("Start");
requester.onNext(Request.newBuilder().setUrl("http://my-url").build()); // fail?
requester.onNext(Request.newBuilder().setUrl("http://my-url").build());
requester.onNext(Request.newBuilder().setUrl("http://my-url").build());
requester.onNext(Request.newBuilder().setUrl("http://my-url").build());
System.out.println("Finish");
}
}
我没有启动任何服务器和 运行 main
方法。我假设程序失败于:
- 创建客户端
- client.downloading 通话
- 或observer.onNext
但令我惊讶的是(对我而言),代码 运行 成功了,只有消息丢失了。输出是:
Start
Finish
Error
由于异步性质,甚至可以在至少通过响应观察器传播错误之前调用完成。这是一种理想的行为吗?我不能丢失任何消息。我错过了什么吗?
谢谢你,亚当
这是预期的行为。正如您提到的 API 是异步的,因此错误通常也必须是异步的。 gRPC 不保证消息传递,并且在流式 RPC 失败的情况下不指示远程端收到了哪些消息。高级ClientCall API calls this out.
如果您需要更强的保证,则必须在应用程序级别添加它,例如回复或 OK
的状态。例如,在 中我提到使用双向流进行确认。
创建 ManagedChannelBuilder
不会出错,因为通道是惰性的:它只在必要时创建 TCP 连接(并在必要时重新连接)。此外,由于大多数故障都是暂时的,我们不希望仅仅因为您的客户端恰好在网络中断时启动而阻止通道上所有未来的 RPC。
由于 API 已经是异步的,grpc-java 可以在发送时有目的地丢弃消息,即使它知道发生了错误(即,它选择不抛出)。因此几乎所有错误都通过 onError()
.
传递给应用程序
我有一个简单的 gRPC 客户端如下:
/**
* Client that calls gRPC.
*/
public class Client {
private static final Context.Key<String> URI_CONTEXT_KEY =
Context.key(Constants.URI_HEADER_KEY);
private final ManagedChannel channel;
private final DoloresRPCStub asyncStub;
/**
* Construct client for accessing gRPC server at {@code host:port}.
* @param host
* @param port
*/
public Client(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true));
}
/**
* Construct client for accessing gRPC server using the existing channel.
* @param channelBuilder {@link ManagedChannelBuilder} instance
*/
public Client(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
asyncStub = DoloresRPCGrpc.newStub(channel);
}
/**
* Closes the client
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* Main async method for communication between client and server
* @param responseObserver user's {@link StreamObserver} implementation to handle
* responses received from the server.
* @return {@link StreamObserver} instance to provide requests into
*/
public StreamObserver<Request> downloading(StreamObserver<Response> responseObserver) {
return asyncStub.downloading(responseObserver);
}
public static void main(String[] args) {
Client cl = new Client("localhost", 8999); // fail??
StreamObserver<Request> requester = cl.downloading(new StreamObserver<Response>() {
@Override
public void onNext(Response value) {
System.out.println("On Next");
}
@Override
public void onError(Throwable t) {
System.out.println("Error");
}
@Override
public void onCompleted() {
System.out.println("Completed");
}
}); // fail ??
System.out.println("Start");
requester.onNext(Request.newBuilder().setUrl("http://my-url").build()); // fail?
requester.onNext(Request.newBuilder().setUrl("http://my-url").build());
requester.onNext(Request.newBuilder().setUrl("http://my-url").build());
requester.onNext(Request.newBuilder().setUrl("http://my-url").build());
System.out.println("Finish");
}
}
我没有启动任何服务器和 运行 main
方法。我假设程序失败于:
- 创建客户端
- client.downloading 通话
- 或observer.onNext
但令我惊讶的是(对我而言),代码 运行 成功了,只有消息丢失了。输出是:
Start
Finish
Error
由于异步性质,甚至可以在至少通过响应观察器传播错误之前调用完成。这是一种理想的行为吗?我不能丢失任何消息。我错过了什么吗?
谢谢你,亚当
这是预期的行为。正如您提到的 API 是异步的,因此错误通常也必须是异步的。 gRPC 不保证消息传递,并且在流式 RPC 失败的情况下不指示远程端收到了哪些消息。高级ClientCall API calls this out.
如果您需要更强的保证,则必须在应用程序级别添加它,例如回复或 OK
的状态。例如,在
创建 ManagedChannelBuilder
不会出错,因为通道是惰性的:它只在必要时创建 TCP 连接(并在必要时重新连接)。此外,由于大多数故障都是暂时的,我们不希望仅仅因为您的客户端恰好在网络中断时启动而阻止通道上所有未来的 RPC。
由于 API 已经是异步的,grpc-java 可以在发送时有目的地丢弃消息,即使它知道发生了错误(即,它选择不抛出)。因此几乎所有错误都通过 onError()
.