大型数据集上的 gRPC 序列化缓慢

gRPC slow serialization on large dataset

我知道 google 指出 protobufs 不支持大消息 (i.e. greater than 1 MB), but I'm trying to stream a dataset using gRPC that's tens of megabytes, and it seems like some people say it's , or at least with some splitting...

但是,当我尝试以这种方式发送数组时 (repeated uint32),在同一台本地计算机上需要大约 20 秒。

#proto
service PAS {
  // analyze single file
  rpc getPhotonRecords (PhotonRecordsRequest) returns (PhotonRecordsReply) {}
}

message PhotonRecordsRequest {
  string fileName = 1;
}

message PhotonRecordsReply {
  repeated uint32 PhotonRecords = 1;
}

其中 PhotonRecordsReply 的长度需要约为 1000 万 uint32...

有人知道如何加快速度吗?或者哪种技术更合适?

所以我想我已经根据给出的评论和答案实现了流式传输,但它仍然需要相同的时间:

#proto
service PAS {
  // analyze single file
  rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}
class PAS_GRPC(pas_pb2_grpc.PASServicer):

    def getPhotonRecords(self, request: pas_pb2.PhotonRecordsRequest, _context):
        raw_data_bytes = flb_tools.read_data_bytes(request.fileName)
        data = flb_tools.reshape_flb_data(raw_data_bytes)
        index = 0
        chunk_size = 1024
        len_data = len(data)
        while index < len_data:
            # last chunk
            if index + chunk_size > len_data:
                yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:])
            # all other chunks
            else:
                yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:index + chunk_size])
            index += chunk_size

最小重现 Github example

如果您将其更改为使用流应该会有帮助。为我转移不到2秒。请注意,这是在没有 ssl 的情况下在本地主机上进行的。我把这段代码放在一起。我做了 运行 它并且成功了。例如,不确定如果文件不是 4 字节的倍数会发生什么。此外,读取的字节顺序是 Java.

的默认顺序

我这样制作了我的 10 兆文件。

dd if=/dev/random  of=my_10mb_file bs=1024 count=10240

这是服务定义。我在此处添加的唯一内容是响应流。

service PAS {
  // analyze single file
  rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}

这是服务器实现。

public class PhotonsServerImpl extends PASImplBase {

  @Override
  public void getPhotonRecords(PhotonRecordsRequest request, StreamObserver<PhotonRecordsReply> responseObserver) {
    log.info("inside getPhotonRecords");
    
    // open the file, I suggest using java.nio API for the fastest read times.
    Path file = Paths.get(request.getFileName());
    try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ)) {

      int blockSize = 1024 * 4;
      ByteBuffer byteBuffer = ByteBuffer.allocate(blockSize);
      boolean done = false;
      while (!done) {
        PhotonRecordsReply.Builder response = PhotonRecordsReply.newBuilder();
        // read 1000 ints from the file.
        byteBuffer.clear();
        int read = fileChannel.read(byteBuffer);
        if (read < blockSize) {
          done = true;
        }
        // write to the response.
        byteBuffer.flip();
        for (int index = 0; index < read / 4; index++) {
          response.addPhotonRecords(byteBuffer.getInt());
        }
        // send the response
        responseObserver.onNext(response.build());
      }
    } catch (Exception e) {
      log.error("", e);
      responseObserver.onError(
          Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException());
    }
    responseObserver.onCompleted();
    log.info("exit getPhotonRecords");

  }
}

客户端只记录接收到的数组的大小。

public long getPhotonRecords(ManagedChannel channel) {
  if (log.isInfoEnabled())
    log.info("Enter - getPhotonRecords ");

  PASGrpc.PASBlockingStub photonClient = PASGrpc.newBlockingStub(channel);

  PhotonRecordsRequest request = PhotonRecordsRequest.newBuilder().setFileName("/udata/jdrummond/logs/my_10mb_file").build();

  photonClient.getPhotonRecords(request).forEachRemaining(photonRecordsReply -> {
    log.info("got this many photons: {}", photonRecordsReply.getPhotonRecordsCount());
  });

  return 0;
}