GRPC 异步 + 阻塞存根 Java

GRPC Async + Blocking Stub Java

我 运行 有点鸡生蛋还是蛋生鸡的问题。

案例:在远程客户端生成一个文件。客户端应通过 asynccstub 将文件传输到服务器。客户端还必须通过要存储在数据库中的阻塞存根传输元数据。

问题:

  1. 如果我先执行异步操作,那么文件数据会先于元数据发送,因此服务器不知道文件的名称或放置位置.我最初打算 return 来自服务器的此信息(双向),但是流观察者不借自己设置匿名定义之外的变量。

  2. 如果我先做同步操作,我可以从服务器取回文件命名信息;但是,我需要将其打包成数据的“Chunks”。这还需要在 GRPC 迭代其流数据时不断打开和关闭保存文件,因为迭代器不容易重置(所以我不能只剥离第一个请求)。

  3. 作为最后一个选项,我可以将所有这些打包到异步请求中并使用任何同步调用进行分派。我相信这将提供一个可行的解决方案,但我担心在已经很大的请求上发送的数据量以及前面提到的低效率。

所以我的问题是:

  1. 有没有办法从响应观察器将全局变量设置为 'value.Message'。
  2. 或者,有没有办法将信息从同步调用传递到服务器端的异步调用?

异步响应观察者:

StreamObserver<GrpcServerComm.UploadStatus> responseObserver = new StreamObserver<GrpcServerComm.UploadStatus>() {

            @Override
            public void onNext(GrpcServerComm.UploadStatus value) {
                if (value.getCode() != 1) {
                    Log.d("Error", "Upload  Procedure Failure");
                    finishLatch.countDown();
                }
            }
            @Override
            public void onError(Throwable t) {
                Log.d("Error", "Upload Response");
                finishLatch.countDown();
            }
            @Override
            public void onCompleted() {
                finishLatch.countDown();
            }
        }; 

相关protobufs

message UploadStatus {
    string filename=1;
    int32 code = 2;
}
message DataChunk
{
    string filename=1;
    bytes chunk = 2;
}
message VideoMetadata
{
    string publisher =1;
    string description =2;
    string tags = 3;
    double videolat= 4;
    double videolong=5;
}
service DataUpload
{
    rpc UploadData (stream DataChunk) returns(UploadStatus);
}
service ContentMetaData
{
    rpc UploadMetaData(VideoMetadata) returns (UploadStatus);
}

Python 服务器端函数

class DataUploadServicer(proto_test_pb2_grpc.DataUploadServicer):
    def UploadData(self,request_it,context):
        response = proto_test_pb2.UploadStatus()
        filename = str(random.getrandbits(32)) #server decides filename
        response = filestream.writefile(filename,request_it)
        return response

def writefile(filename, chunks):
    response = proto_test_pb2.UploadStatus()
    filename='tmp/'+filename
    app_file = open(filename,"ab")
    for chunk in chunks:
        app_file.write(chunk.chunk)
    app_file.close()
    print('File Written')
    response.Code=1
    response.Message = "Succsesful write"
    return response

Java 网友,这篇here.

有详细文章

我认为将它们作为 2 个单独的请求并不是一个好主意。相反,MetadataDataChunk 应该合并为 1 个单一类型,如此处所示。

message FileUploadRequest {
    VideoMetadata metaData = 1;
    DataChunk dataChunk = 2;
}

现在您可能会问为什么我们必须为每个请求发送元数据!这就是 gRPC oneof 类型有帮助的地方。

message FileUploadRequest {
  oneof upload_data {
    VideoMetadata metaData = 1;
    DataChunk dataChunk = 2;
  }
}

你的服务应该是这样的

service FileuploadService {
    rpc UploadData (stream FileUploadRequest) returns(UploadStatus);
}

当您使用 Oneof 时,在您生成的代码中,oneof 字段与常规字段具有相同的 getter 和 setter。您还可以获得一种特殊的方法来检查 oneof 中设置了哪个值(如果有的话)。首先发送元数据,然后发送块。根据,设置哪一个,然后你就可以据此做出决定了。