将服务重定向到双向 grpc
Redirecting service to bidirectional grpc
我目前正在开发一个 grpc 服务器,它将接收来自第一台服务器的流式 grpc 调用并将这些调用重定向到第二台服务器,并将来自第二台服务器的响应作为流重定向到第一台服务器。
我有 2 个 proto 文件第一个 proto
第一个文件:
syntax = "proto3";
package first.proto.pack;
service FirstProtoService {
rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}
message RequestToFirstServer {
oneof firstStreamingRequest {
int32 x = 1;
int32 y = 2;
}
}
message ResponseForFirstServer {
string someprocessedinformation = 1;
}
第二个文件:
syntax = "proto3";
package second.proto.pack;
service SecondProtoService {
rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}
message RequestToSecondServer {
oneof secondStreamingRequest {
int32 processedX = 1;
int32 procesdedY = 2;
}
}
message ResponseFromSecondServer {
string computedInformation = 1;
}
第一个服务器知道第一个 proto 文件,但不知道第二个。
第二个服务器知道第二个 proto 文件但不知道第一个。
中间服务器知道第一个和第二个协议。
需要编写一个服务器,将来自一台服务器的请求从一台服务器传输到另一台服务器
我是在 Java 上开始写的。但面临向第二台服务器发送大量请求的问题
我的服务中间实现在 Java 上的样子:
package middle.server.pack;
import first.proto.pack.First;
import first.proto.pack.FirstProtoServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import second.proto.pack.Second;
import second.proto.pack.SecondProtoServiceGrpc;
import java.util.logging.LogManager;
import java.util.logging.Logger;
public class MiddleService extends FirstProtoServiceGrpc.FirstProtoServiceImplBase {
private final ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8080").build();
private final Logger logger = LogManager.getLogManager().getLogger(MiddleService.class.getName());
@Override
public StreamObserver<First.RequestToFirstServer> streamingCall(StreamObserver<First.ResponseForFirstServer> responseObserver) {
return new StreamObserver<First.RequestToFirstServer>() {
@Override
public void onNext(First.RequestToFirstServer value) {
SecondProtoServiceGrpc.SecondProtoServiceStub stub = SecondProtoServiceGrpc.newStub(channel);
StreamObserver<Second.RequestToSecondServer> requestObserver = stub.streamingCall(
new StreamObserver<Second.ResponseFromSecondServer>() {
@Override
public void onNext(Second.ResponseFromSecondServer value) {
doProcessOnResponse(value);
First.ResponseForFirstServer responseForFirstServer =
mapToFirstResponse(value);
responseObserver.onNext(responseForFirstServer);
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("sucess");
}
}
);
Second.RequestToSecondServer requestToSecondServer = mapToSecondRequest(value);
requestObserver.onNext(requestToSecondServer);
requestObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("Everything okay");
}
};
}
}
从中间服务器端的第一个客户端请求后,我得到以下错误:
CANCELLED: Failed to read message.
CANCELLED: io.grpc.Context was cancelled without error
我知道我做错了。所以问题是如何让它正确,或者如果我不能在 java 上做到这一点,我可以用任何其他语言做到吗?
据我所知,问题是在 onNext()
中,每次 MiddleService
从FirstProtoServiceGrpc
。即使它是来自第一台服务器的相同流,第一条消息也会创建自己的流到第二台服务器,第二条消息会创建自己的流到第二台服务器,依此类推。这就解释了为什么你会遇到“发送很多请求到第二个服务器。
相反,中间层应该反映第一个服务器的作用(在这个例子中,我们只关注第一个 -> 中间 -> 第二个方向)。当第一个服务器创建一个新的流 (1) 到中间时,中间创建一个新的流 (2) 到第二个。当中间服务器在流 (1) 上(从第一台服务器)收到一条消息时,它将它发送到流 (2)。当流 (1) 关闭时,流 (2) 也关闭。等等。
反之亦然,反之亦然
这里有几个问题:
- 正如@SergiiTkachenko 所指出的,您为来自第一台服务器的每条消息创建一个新的 RPC 到第二台服务器。要解决此问题,请将对第二个服务器的调用移动到外部方法的开头 3 行。
- 对
requestObserver.onCompleted()
的调用应移至下面 StreamObserver<First.RequestToFirstServer>
几行中的 onCompleted()
(在 logger.info("Everything okay");
旁边)。
- 你永远不会打电话给
responseObserver.onCompleted()
。您应该在 StreamObserver<Second.ResponseFromSecondServer>
的 onCompleted()
中这样做(在 logger.info("sucess");
旁边)。
- 您应该将从一台服务器收到的错误信号发送给另一台服务器(
onError(...)
方法之间的关系应类似于 onCompleted()
)
- 您应该使用 setOnCancelHandler(...) 处理第一台服务器的取消并将它们传播到第二台服务器。
所以一般来说(和 ),接收给定的回调(onNext
、onError
、onCompleted
、“onCancel
”)其中一台服务器应该触发向另一台服务器发出相应的调用(在需要的地方“翻译”参数之后)。
最后,您应该使用 CallStreamObserver: disableAutoInboundFlowControl(), request(1), isReady() and setOnReadyHandler(...). You can always cast your outbound StreamObserver
s to CallStreamObserver
s. More specifically, you should cast responseObserver
to ServerCallStreamObserver and requestObserver
to ClientCallStreamObserver 中的方法尊重两台服务器的准备情况。
这个应该交叉实现:
- 如果其他服务器
isReady
. ,您应该 request(1)
在处理来自它的消息(在 onNext()
中)结束时来自给定服务器的消息
- 从一台服务器接收“
onReady
”回调应该触发来自另一台服务器的 request(...)
-ing 1 条消息。
据我所知,从外部方法返回观察者后,您应该会收到来自两个服务器的初始“onReady
”回调,这将使一切都开始运转。但是我不是 100% 确定来自第二个服务器的回调并且目前无法验证它:如果你没有收到它的初始回调,只需在返回观察者之前从第一个服务器请求一条初始消息。
我目前正在开发一个 grpc 服务器,它将接收来自第一台服务器的流式 grpc 调用并将这些调用重定向到第二台服务器,并将来自第二台服务器的响应作为流重定向到第一台服务器。
我有 2 个 proto 文件第一个 proto
第一个文件:
syntax = "proto3";
package first.proto.pack;
service FirstProtoService {
rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}
message RequestToFirstServer {
oneof firstStreamingRequest {
int32 x = 1;
int32 y = 2;
}
}
message ResponseForFirstServer {
string someprocessedinformation = 1;
}
第二个文件:
syntax = "proto3";
package second.proto.pack;
service SecondProtoService {
rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}
message RequestToSecondServer {
oneof secondStreamingRequest {
int32 processedX = 1;
int32 procesdedY = 2;
}
}
message ResponseFromSecondServer {
string computedInformation = 1;
}
第一个服务器知道第一个 proto 文件,但不知道第二个。
第二个服务器知道第二个 proto 文件但不知道第一个。
中间服务器知道第一个和第二个协议。
需要编写一个服务器,将来自一台服务器的请求从一台服务器传输到另一台服务器
我是在 Java 上开始写的。但面临向第二台服务器发送大量请求的问题
我的服务中间实现在 Java 上的样子:
package middle.server.pack;
import first.proto.pack.First;
import first.proto.pack.FirstProtoServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import second.proto.pack.Second;
import second.proto.pack.SecondProtoServiceGrpc;
import java.util.logging.LogManager;
import java.util.logging.Logger;
public class MiddleService extends FirstProtoServiceGrpc.FirstProtoServiceImplBase {
private final ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8080").build();
private final Logger logger = LogManager.getLogManager().getLogger(MiddleService.class.getName());
@Override
public StreamObserver<First.RequestToFirstServer> streamingCall(StreamObserver<First.ResponseForFirstServer> responseObserver) {
return new StreamObserver<First.RequestToFirstServer>() {
@Override
public void onNext(First.RequestToFirstServer value) {
SecondProtoServiceGrpc.SecondProtoServiceStub stub = SecondProtoServiceGrpc.newStub(channel);
StreamObserver<Second.RequestToSecondServer> requestObserver = stub.streamingCall(
new StreamObserver<Second.ResponseFromSecondServer>() {
@Override
public void onNext(Second.ResponseFromSecondServer value) {
doProcessOnResponse(value);
First.ResponseForFirstServer responseForFirstServer =
mapToFirstResponse(value);
responseObserver.onNext(responseForFirstServer);
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("sucess");
}
}
);
Second.RequestToSecondServer requestToSecondServer = mapToSecondRequest(value);
requestObserver.onNext(requestToSecondServer);
requestObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("Everything okay");
}
};
}
}
从中间服务器端的第一个客户端请求后,我得到以下错误:
CANCELLED: Failed to read message.
CANCELLED: io.grpc.Context was cancelled without error
我知道我做错了。所以问题是如何让它正确,或者如果我不能在 java 上做到这一点,我可以用任何其他语言做到吗?
据我所知,问题是在 onNext()
中,每次 MiddleService
从FirstProtoServiceGrpc
。即使它是来自第一台服务器的相同流,第一条消息也会创建自己的流到第二台服务器,第二条消息会创建自己的流到第二台服务器,依此类推。这就解释了为什么你会遇到“发送很多请求到第二个服务器。
相反,中间层应该反映第一个服务器的作用(在这个例子中,我们只关注第一个 -> 中间 -> 第二个方向)。当第一个服务器创建一个新的流 (1) 到中间时,中间创建一个新的流 (2) 到第二个。当中间服务器在流 (1) 上(从第一台服务器)收到一条消息时,它将它发送到流 (2)。当流 (1) 关闭时,流 (2) 也关闭。等等。
反之亦然,反之亦然
这里有几个问题:
- 正如@SergiiTkachenko 所指出的,您为来自第一台服务器的每条消息创建一个新的 RPC 到第二台服务器。要解决此问题,请将对第二个服务器的调用移动到外部方法的开头 3 行。
- 对
requestObserver.onCompleted()
的调用应移至下面StreamObserver<First.RequestToFirstServer>
几行中的onCompleted()
(在logger.info("Everything okay");
旁边)。 - 你永远不会打电话给
responseObserver.onCompleted()
。您应该在StreamObserver<Second.ResponseFromSecondServer>
的onCompleted()
中这样做(在logger.info("sucess");
旁边)。 - 您应该将从一台服务器收到的错误信号发送给另一台服务器(
onError(...)
方法之间的关系应类似于onCompleted()
) - 您应该使用 setOnCancelHandler(...) 处理第一台服务器的取消并将它们传播到第二台服务器。
所以一般来说(和 onNext
、onError
、onCompleted
、“onCancel
”)其中一台服务器应该触发向另一台服务器发出相应的调用(在需要的地方“翻译”参数之后)。
最后,您应该使用 CallStreamObserver: disableAutoInboundFlowControl(), request(1), isReady() and setOnReadyHandler(...). You can always cast your outbound StreamObserver
s to CallStreamObserver
s. More specifically, you should cast responseObserver
to ServerCallStreamObserver and requestObserver
to ClientCallStreamObserver 中的方法尊重两台服务器的准备情况。
这个应该交叉实现:
- 如果其他服务器
isReady
. ,您应该 - 从一台服务器接收“
onReady
”回调应该触发来自另一台服务器的request(...)
-ing 1 条消息。
request(1)
在处理来自它的消息(在 onNext()
中)结束时来自给定服务器的消息
据我所知,从外部方法返回观察者后,您应该会收到来自两个服务器的初始“onReady
”回调,这将使一切都开始运转。但是我不是 100% 确定来自第二个服务器的回调并且目前无法验证它:如果你没有收到它的初始回调,只需在返回观察者之前从第一个服务器请求一条初始消息。