grpc-java:正确处理客户端上的服务流调用重试
grpc-java: Proper handling of retry on client for service streaming call
我正在尝试使用服务流和客户端上的异步存根在 grpc 上设置一个简单的 pubslish/subscribe 模式。在实现部分流消息返回客户端后,我想处理连接断开的情况。现在我正在实施部分服务关闭时的部分,例如客户端应该 'recover' 连接丢失。
我已经阅读并搜索了 google/github/so 上的重试机制,并最终为流式传输消息的服务中的方法设置了重试策略。据我了解,重试机制应该在服务 returns 重试策略中定义的一些 retryableStatusCodes 时起作用。在客户端引入重试策略后,我想测试一下,下面两个场景的结果让我对重试感到困惑。
第一种情况:
- connect 过程被调用(~n 秒后故意没有消息流回客户端)
- 服务已关闭
- onError 未在客户端
上调用
- 服务再次启动
- 连接 重试
第二种情况:
- connect 过程被调用(~n 秒后第一条消息到达,消息在客户端的 onNext 处理程序中处理)
- 服务已关闭
- onError 是 客户端调用
- 服务再次启动
- 连接未重试
总的来说,让我感到困惑的是为什么这两种情况之间的行为存在差异?为什么在第一种情况下检测到服务器返回不可用并尝试重试,但在第二种情况下即使状态相同,重试也不起作用?
这里是 connect 客户端调用,connect 服务方法,以及客户端重试策略设置的代码
client:
messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
@Override
public void onNext(MessageResponse messageResponse) {
//process new message
MessageDto message = new MessageDto();
message.setBody(messageResponse.getBody());
message.setTitle(messageResponse.getTitle());
messageService.broadcastMessage(message);
}
@Override
public void onError(Throwable throwable) {
//service went down
LOGGER.error(throwable.getStackTrace());
}
@Override
public void onCompleted() {
//This method should be called when user logs out of the application
LOGGER.info(String.format("Message streaming terminated for user %d", userId));
}
});
service:
@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
Long userId = request.getUserId();
ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
(ServerCallStreamObserver<MessageResponse >) responseObserver;
serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
registerClient(userId, serverCallStreamObserver);
//responseObserver.onCompleted() is left out so connection is not terminated
}
@EventListener
public void listenForMessages(MessageEvent messageEvent) {
//omitted code (just some data retrieving - populate conn and message vars)....
MessageResponse.Builder builder = MessageResponse.newBuilder();
StreamObserver<MessageResponse> observer = conn.getResponseObserver();
builder.setType(message.getType());
builder.setTitle(message.getTitle());
builder.setBody(message.getBody());
observer.onNext(builder.build())
}
retryPolicy:
{
"methodConfig" : [
{
"name": [
{
"service": "ch.example.proto.MessageService",
"method": "connect"
}
],
"retryPolicy": {
"maxAttempts": 10,
"initialBackoff": "5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}
]
}
问题是接收消息提交 RPC。 gRFC A6 Client Retries 中对此进行了讨论。它提到 Response-Headers
,当服务器响应第一条消息时隐式发送。
本质上,一旦 gRPC 将数据传回客户端,就无法自动重试。如果 gRPC 重试,它应该如何将新流与它已经响应的流结合起来?它应该跳过第一个 N
响应吗?但是,如果现在的反应不同了呢?对于元数据(通过 Response-Headers
交付)来说,问题更严重,因为它们无法再次提供给客户端。
gRPC 能够将客户端的 请求 重播到多个后端,但是一旦它开始从后端接收响应,它将成为 "fixed" 到该后端并且是无法改变其决定。
您需要应用程序级重试才能重新建立流。当客户端重新建立流时,它可能需要修改请求以通知服务器客户端已经收到了哪些消息。
我正在尝试使用服务流和客户端上的异步存根在 grpc 上设置一个简单的 pubslish/subscribe 模式。在实现部分流消息返回客户端后,我想处理连接断开的情况。现在我正在实施部分服务关闭时的部分,例如客户端应该 'recover' 连接丢失。
我已经阅读并搜索了 google/github/so 上的重试机制,并最终为流式传输消息的服务中的方法设置了重试策略。据我了解,重试机制应该在服务 returns 重试策略中定义的一些 retryableStatusCodes 时起作用。在客户端引入重试策略后,我想测试一下,下面两个场景的结果让我对重试感到困惑。
第一种情况:
- connect 过程被调用(~n 秒后故意没有消息流回客户端)
- 服务已关闭
- onError 未在客户端 上调用
- 服务再次启动
- 连接 重试
第二种情况:
- connect 过程被调用(~n 秒后第一条消息到达,消息在客户端的 onNext 处理程序中处理)
- 服务已关闭
- onError 是 客户端调用
- 服务再次启动
- 连接未重试
总的来说,让我感到困惑的是为什么这两种情况之间的行为存在差异?为什么在第一种情况下检测到服务器返回不可用并尝试重试,但在第二种情况下即使状态相同,重试也不起作用?
这里是 connect 客户端调用,connect 服务方法,以及客户端重试策略设置的代码
client:
messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
@Override
public void onNext(MessageResponse messageResponse) {
//process new message
MessageDto message = new MessageDto();
message.setBody(messageResponse.getBody());
message.setTitle(messageResponse.getTitle());
messageService.broadcastMessage(message);
}
@Override
public void onError(Throwable throwable) {
//service went down
LOGGER.error(throwable.getStackTrace());
}
@Override
public void onCompleted() {
//This method should be called when user logs out of the application
LOGGER.info(String.format("Message streaming terminated for user %d", userId));
}
});
service:
@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
Long userId = request.getUserId();
ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
(ServerCallStreamObserver<MessageResponse >) responseObserver;
serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
registerClient(userId, serverCallStreamObserver);
//responseObserver.onCompleted() is left out so connection is not terminated
}
@EventListener
public void listenForMessages(MessageEvent messageEvent) {
//omitted code (just some data retrieving - populate conn and message vars)....
MessageResponse.Builder builder = MessageResponse.newBuilder();
StreamObserver<MessageResponse> observer = conn.getResponseObserver();
builder.setType(message.getType());
builder.setTitle(message.getTitle());
builder.setBody(message.getBody());
observer.onNext(builder.build())
}
retryPolicy:
{
"methodConfig" : [
{
"name": [
{
"service": "ch.example.proto.MessageService",
"method": "connect"
}
],
"retryPolicy": {
"maxAttempts": 10,
"initialBackoff": "5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}
]
}
问题是接收消息提交 RPC。 gRFC A6 Client Retries 中对此进行了讨论。它提到 Response-Headers
,当服务器响应第一条消息时隐式发送。
本质上,一旦 gRPC 将数据传回客户端,就无法自动重试。如果 gRPC 重试,它应该如何将新流与它已经响应的流结合起来?它应该跳过第一个 N
响应吗?但是,如果现在的反应不同了呢?对于元数据(通过 Response-Headers
交付)来说,问题更严重,因为它们无法再次提供给客户端。
gRPC 能够将客户端的 请求 重播到多个后端,但是一旦它开始从后端接收响应,它将成为 "fixed" 到该后端并且是无法改变其决定。
您需要应用程序级重试才能重新建立流。当客户端重新建立流时,它可能需要修改请求以通知服务器客户端已经收到了哪些消息。