GRPC 异步 + 阻塞存根 Java
GRPC Async + Blocking Stub Java
我 运行 有点鸡生蛋还是蛋生鸡的问题。
案例:在远程客户端生成一个文件。客户端应通过 asynccstub 将文件传输到服务器。客户端还必须通过要存储在数据库中的阻塞存根传输元数据。
问题:
如果我先执行异步操作,那么文件数据会先于元数据发送,因此服务器不知道文件的名称或放置位置.我最初打算 return 来自服务器的此信息(双向),但是流观察者不借自己设置匿名定义之外的变量。
如果我先做同步操作,我可以从服务器取回文件命名信息;但是,我需要将其打包成数据的“Chunks”。这还需要在 GRPC 迭代其流数据时不断打开和关闭保存文件,因为迭代器不容易重置(所以我不能只剥离第一个请求)。
作为最后一个选项,我可以将所有这些打包到异步请求中并使用任何同步调用进行分派。我相信这将提供一个可行的解决方案,但我担心在已经很大的请求上发送的数据量以及前面提到的低效率。
所以我的问题是:
- 有没有办法从响应观察器将全局变量设置为 'value.Message'。
- 或者,有没有办法将信息从同步调用传递到服务器端的异步调用?
异步响应观察者:
StreamObserver<GrpcServerComm.UploadStatus> responseObserver = new StreamObserver<GrpcServerComm.UploadStatus>() {
@Override
public void onNext(GrpcServerComm.UploadStatus value) {
if (value.getCode() != 1) {
Log.d("Error", "Upload Procedure Failure");
finishLatch.countDown();
}
}
@Override
public void onError(Throwable t) {
Log.d("Error", "Upload Response");
finishLatch.countDown();
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
};
相关protobufs
message UploadStatus {
string filename=1;
int32 code = 2;
}
message DataChunk
{
string filename=1;
bytes chunk = 2;
}
message VideoMetadata
{
string publisher =1;
string description =2;
string tags = 3;
double videolat= 4;
double videolong=5;
}
service DataUpload
{
rpc UploadData (stream DataChunk) returns(UploadStatus);
}
service ContentMetaData
{
rpc UploadMetaData(VideoMetadata) returns (UploadStatus);
}
Python 服务器端函数
class DataUploadServicer(proto_test_pb2_grpc.DataUploadServicer):
def UploadData(self,request_it,context):
response = proto_test_pb2.UploadStatus()
filename = str(random.getrandbits(32)) #server decides filename
response = filestream.writefile(filename,request_it)
return response
def writefile(filename, chunks):
response = proto_test_pb2.UploadStatus()
filename='tmp/'+filename
app_file = open(filename,"ab")
for chunk in chunks:
app_file.write(chunk.chunk)
app_file.close()
print('File Written')
response.Code=1
response.Message = "Succsesful write"
return response
Java 网友,这篇here.
有详细文章
我认为将它们作为 2 个单独的请求并不是一个好主意。相反,Metadata
和 DataChunk
应该合并为 1 个单一类型,如此处所示。
message FileUploadRequest {
VideoMetadata metaData = 1;
DataChunk dataChunk = 2;
}
现在您可能会问为什么我们必须为每个请求发送元数据!这就是 gRPC oneof 类型有帮助的地方。
message FileUploadRequest {
oneof upload_data {
VideoMetadata metaData = 1;
DataChunk dataChunk = 2;
}
}
你的服务应该是这样的
service FileuploadService {
rpc UploadData (stream FileUploadRequest) returns(UploadStatus);
}
当您使用 Oneof 时,在您生成的代码中,oneof 字段与常规字段具有相同的 getter 和 setter。您还可以获得一种特殊的方法来检查 oneof 中设置了哪个值(如果有的话)。首先发送元数据,然后发送块。根据,设置哪一个,然后你就可以据此做出决定了。
我 运行 有点鸡生蛋还是蛋生鸡的问题。
案例:在远程客户端生成一个文件。客户端应通过 asynccstub 将文件传输到服务器。客户端还必须通过要存储在数据库中的阻塞存根传输元数据。
问题:
如果我先执行异步操作,那么文件数据会先于元数据发送,因此服务器不知道文件的名称或放置位置.我最初打算 return 来自服务器的此信息(双向),但是流观察者不借自己设置匿名定义之外的变量。
如果我先做同步操作,我可以从服务器取回文件命名信息;但是,我需要将其打包成数据的“Chunks”。这还需要在 GRPC 迭代其流数据时不断打开和关闭保存文件,因为迭代器不容易重置(所以我不能只剥离第一个请求)。
作为最后一个选项,我可以将所有这些打包到异步请求中并使用任何同步调用进行分派。我相信这将提供一个可行的解决方案,但我担心在已经很大的请求上发送的数据量以及前面提到的低效率。
所以我的问题是:
- 有没有办法从响应观察器将全局变量设置为 'value.Message'。
- 或者,有没有办法将信息从同步调用传递到服务器端的异步调用?
异步响应观察者:
StreamObserver<GrpcServerComm.UploadStatus> responseObserver = new StreamObserver<GrpcServerComm.UploadStatus>() {
@Override
public void onNext(GrpcServerComm.UploadStatus value) {
if (value.getCode() != 1) {
Log.d("Error", "Upload Procedure Failure");
finishLatch.countDown();
}
}
@Override
public void onError(Throwable t) {
Log.d("Error", "Upload Response");
finishLatch.countDown();
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
};
相关protobufs
message UploadStatus {
string filename=1;
int32 code = 2;
}
message DataChunk
{
string filename=1;
bytes chunk = 2;
}
message VideoMetadata
{
string publisher =1;
string description =2;
string tags = 3;
double videolat= 4;
double videolong=5;
}
service DataUpload
{
rpc UploadData (stream DataChunk) returns(UploadStatus);
}
service ContentMetaData
{
rpc UploadMetaData(VideoMetadata) returns (UploadStatus);
}
Python 服务器端函数
class DataUploadServicer(proto_test_pb2_grpc.DataUploadServicer):
def UploadData(self,request_it,context):
response = proto_test_pb2.UploadStatus()
filename = str(random.getrandbits(32)) #server decides filename
response = filestream.writefile(filename,request_it)
return response
def writefile(filename, chunks):
response = proto_test_pb2.UploadStatus()
filename='tmp/'+filename
app_file = open(filename,"ab")
for chunk in chunks:
app_file.write(chunk.chunk)
app_file.close()
print('File Written')
response.Code=1
response.Message = "Succsesful write"
return response
Java 网友,这篇here.
有详细文章我认为将它们作为 2 个单独的请求并不是一个好主意。相反,Metadata
和 DataChunk
应该合并为 1 个单一类型,如此处所示。
message FileUploadRequest {
VideoMetadata metaData = 1;
DataChunk dataChunk = 2;
}
现在您可能会问为什么我们必须为每个请求发送元数据!这就是 gRPC oneof 类型有帮助的地方。
message FileUploadRequest {
oneof upload_data {
VideoMetadata metaData = 1;
DataChunk dataChunk = 2;
}
}
你的服务应该是这样的
service FileuploadService {
rpc UploadData (stream FileUploadRequest) returns(UploadStatus);
}
当您使用 Oneof 时,在您生成的代码中,oneof 字段与常规字段具有相同的 getter 和 setter。您还可以获得一种特殊的方法来检查 oneof 中设置了哪个值(如果有的话)。首先发送元数据,然后发送块。根据,设置哪一个,然后你就可以据此做出决定了。