将服务重定向到双向 grpc

Redirecting service to bidirectional grpc

我目前正在开发一个 grpc 服务器,它将接收来自第一台服务器的流式 grpc 调用并将这些调用重定向到第二台服务器,并将来自第二台服务器的响应作为流重定向到第一台服务器。

我有 2 个 proto 文件第一个 proto

第一个文件:

syntax = "proto3";

package first.proto.pack;

service FirstProtoService {
  rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}

message RequestToFirstServer {
    oneof firstStreamingRequest {
        int32 x = 1;
        int32 y = 2;
    }
}

message ResponseForFirstServer {
  string someprocessedinformation = 1;
}

第二个文件:

syntax = "proto3";

package second.proto.pack;

service SecondProtoService {
  rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}

message RequestToSecondServer {
  oneof secondStreamingRequest {
    int32 processedX = 1;
    int32 procesdedY = 2;
  }
}

message ResponseFromSecondServer {
  string computedInformation = 1;
}

第一个服务器知道第一个 proto 文件,但不知道第二个。

第二个服务器知道第二个 proto 文件但不知道第一个。

中间服务器知道第一个和第二个协议。

需要编写一个服务器,将来自一台服务器的请求从一台服务器传输到另一台服务器

我是在 Java 上开始写的。但面临向第二台服务器发送大量请求的问题

我的服务中间实现在 Java 上的样子:

package middle.server.pack;


import first.proto.pack.First;
import first.proto.pack.FirstProtoServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import second.proto.pack.Second;
import second.proto.pack.SecondProtoServiceGrpc;

import java.util.logging.LogManager;
import java.util.logging.Logger;

public class MiddleService extends FirstProtoServiceGrpc.FirstProtoServiceImplBase {
    private final ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8080").build();
    private final Logger logger = LogManager.getLogManager().getLogger(MiddleService.class.getName());

    @Override
    public StreamObserver<First.RequestToFirstServer> streamingCall(StreamObserver<First.ResponseForFirstServer> responseObserver) {
        return new StreamObserver<First.RequestToFirstServer>() {
            @Override
            public void onNext(First.RequestToFirstServer value) {
                SecondProtoServiceGrpc.SecondProtoServiceStub stub = SecondProtoServiceGrpc.newStub(channel);
                StreamObserver<Second.RequestToSecondServer> requestObserver = stub.streamingCall(
                        new StreamObserver<Second.ResponseFromSecondServer>() {
                            @Override
                            public void onNext(Second.ResponseFromSecondServer value) {
                                doProcessOnResponse(value);
                                First.ResponseForFirstServer responseForFirstServer =
                                        mapToFirstResponse(value);
                                responseObserver.onNext(responseForFirstServer);
                            }

                            @Override
                            public void onError(Throwable t) {
                                logger.info(t.getMessage());
                            }

                            @Override
                            public void onCompleted() {
                                logger.info("sucess");
                            }
                        }
                );
                Second.RequestToSecondServer requestToSecondServer = mapToSecondRequest(value);
                requestObserver.onNext(requestToSecondServer);
                requestObserver.onCompleted();
            }

            @Override
            public void onError(Throwable t) {
                logger.info(t.getMessage());
            }

            @Override
            public void onCompleted() {
                logger.info("Everything okay");
            }
        };
    }
}

从中间服务器端的第一个客户端请求后,我得到以下错误:

CANCELLED: Failed to read message.
CANCELLED: io.grpc.Context was cancelled without error

我知道我做错了。所以问题是如何让它正确,或者如果我不能在 java 上做到这一点,我可以用任何其他语言做到吗?

据我所知,问题是在 onNext() 中,每次 MiddleServiceFirstProtoServiceGrpc。即使它是来自第一台服务器的相同流,第一条消息也会创建自己的流到第二台服务器,第二条消息会创建自己的流到第二台服务器,依此类推。这就解释了为什么你会遇到“发送很多请求到第二个服务器。

相反,中间层应该反映第一个服务器的作用(在这个例子中,我们只关注第一个 -> 中间 -> 第二个方向)。当第一个服务器创建一个新的流 (1) 到中间时,中间创建一个新的流 (2) 到第二个。当中间服务器在流 (1) 上(从第一台服务器)收到一条消息时,它将它发送到流 (2)。当流 (1) 关闭时,流 (2) 也关闭。等等。

反之亦然,反之亦然

这里有几个问题:

  • 正如@SergiiTkachenko 所指出的,您为来自第一台服务器的每条消息创建一个新的 RPC 到第二台服务器。要解决此问题,请将对第二个服务器的调用移动到外部方法的开头 3 行。
  • requestObserver.onCompleted() 的调用应移至下面 StreamObserver<First.RequestToFirstServer> 几行中的 onCompleted()(在 logger.info("Everything okay"); 旁边)。
  • 你永远不会打电话给 responseObserver.onCompleted()。您应该在 StreamObserver<Second.ResponseFromSecondServer>onCompleted() 中这样做(在 logger.info("sucess"); 旁边)。
  • 您应该将从一台服务器收到的错误信号发送给另一台服务器(onError(...) 方法之间的关系应类似于 onCompleted()
  • 您应该使用 setOnCancelHandler(...) 处理第一台服务器的取消并将它们传播到第二台服务器。

所以一般来说(和 ),接收给定的回调(onNextonErroronCompleted、“onCancel”)其中一台服务器应该触发向另一台服务器发出相应的调用(在需要的地方“翻译”参数之后)。

最后,您应该使用 CallStreamObserver: disableAutoInboundFlowControl(), request(1), isReady() and setOnReadyHandler(...). You can always cast your outbound StreamObservers to CallStreamObservers. More specifically, you should cast responseObserver to ServerCallStreamObserver and requestObserver to ClientCallStreamObserver 中的方法尊重两台服务器的准备情况。
这个应该交叉实现:

  • 如果其他服务器 isReady.
  • ,您应该 request(1) 在处理来自它的消息(在 onNext() 中)结束时来自给定服务器的消息
  • 从一台服务器接收“onReady”回调应该触发来自另一台服务器的 request(...)-ing 1 条消息。

据我所知,从外部方法返回观察者后,您应该会收到来自两个服务器的初始“onReady”回调,这将使一切都开始运转。但是我不是 100% 确定来自第二个服务器的回调并且目前无法验证它:如果你没有收到它的初始回调,只需在返回观察者之前从第一个服务器请求一条初始消息。