大型数据集上的 gRPC 序列化缓慢
gRPC slow serialization on large dataset
我知道 google 指出 protobufs 不支持大消息 (i.e. greater than 1 MB), but I'm trying to stream a dataset using gRPC that's tens of megabytes, and it seems like some people say it's , or at least with some splitting...
但是,当我尝试以这种方式发送数组时 (repeated uint32
),在同一台本地计算机上需要大约 20 秒。
#proto
service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (PhotonRecordsReply) {}
}
message PhotonRecordsRequest {
string fileName = 1;
}
message PhotonRecordsReply {
repeated uint32 PhotonRecords = 1;
}
其中 PhotonRecordsReply
的长度需要约为 1000 万 uint32...
有人知道如何加快速度吗?或者哪种技术更合适?
所以我想我已经根据给出的评论和答案实现了流式传输,但它仍然需要相同的时间:
#proto
service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}
class PAS_GRPC(pas_pb2_grpc.PASServicer):
def getPhotonRecords(self, request: pas_pb2.PhotonRecordsRequest, _context):
raw_data_bytes = flb_tools.read_data_bytes(request.fileName)
data = flb_tools.reshape_flb_data(raw_data_bytes)
index = 0
chunk_size = 1024
len_data = len(data)
while index < len_data:
# last chunk
if index + chunk_size > len_data:
yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:])
# all other chunks
else:
yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:index + chunk_size])
index += chunk_size
最小重现
Github example
如果您将其更改为使用流应该会有帮助。为我转移不到2秒。请注意,这是在没有 ssl 的情况下在本地主机上进行的。我把这段代码放在一起。我做了 运行 它并且成功了。例如,不确定如果文件不是 4 字节的倍数会发生什么。此外,读取的字节顺序是 Java.
的默认顺序
我这样制作了我的 10 兆文件。
dd if=/dev/random of=my_10mb_file bs=1024 count=10240
这是服务定义。我在此处添加的唯一内容是响应流。
service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}
这是服务器实现。
public class PhotonsServerImpl extends PASImplBase {
@Override
public void getPhotonRecords(PhotonRecordsRequest request, StreamObserver<PhotonRecordsReply> responseObserver) {
log.info("inside getPhotonRecords");
// open the file, I suggest using java.nio API for the fastest read times.
Path file = Paths.get(request.getFileName());
try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ)) {
int blockSize = 1024 * 4;
ByteBuffer byteBuffer = ByteBuffer.allocate(blockSize);
boolean done = false;
while (!done) {
PhotonRecordsReply.Builder response = PhotonRecordsReply.newBuilder();
// read 1000 ints from the file.
byteBuffer.clear();
int read = fileChannel.read(byteBuffer);
if (read < blockSize) {
done = true;
}
// write to the response.
byteBuffer.flip();
for (int index = 0; index < read / 4; index++) {
response.addPhotonRecords(byteBuffer.getInt());
}
// send the response
responseObserver.onNext(response.build());
}
} catch (Exception e) {
log.error("", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException());
}
responseObserver.onCompleted();
log.info("exit getPhotonRecords");
}
}
客户端只记录接收到的数组的大小。
public long getPhotonRecords(ManagedChannel channel) {
if (log.isInfoEnabled())
log.info("Enter - getPhotonRecords ");
PASGrpc.PASBlockingStub photonClient = PASGrpc.newBlockingStub(channel);
PhotonRecordsRequest request = PhotonRecordsRequest.newBuilder().setFileName("/udata/jdrummond/logs/my_10mb_file").build();
photonClient.getPhotonRecords(request).forEachRemaining(photonRecordsReply -> {
log.info("got this many photons: {}", photonRecordsReply.getPhotonRecordsCount());
});
return 0;
}
我知道 google 指出 protobufs 不支持大消息 (i.e. greater than 1 MB), but I'm trying to stream a dataset using gRPC that's tens of megabytes, and it seems like some people say it's
但是,当我尝试以这种方式发送数组时 (repeated uint32
),在同一台本地计算机上需要大约 20 秒。
#proto
service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (PhotonRecordsReply) {}
}
message PhotonRecordsRequest {
string fileName = 1;
}
message PhotonRecordsReply {
repeated uint32 PhotonRecords = 1;
}
其中 PhotonRecordsReply
的长度需要约为 1000 万 uint32...
有人知道如何加快速度吗?或者哪种技术更合适?
所以我想我已经根据给出的评论和答案实现了流式传输,但它仍然需要相同的时间:
#proto
service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}
class PAS_GRPC(pas_pb2_grpc.PASServicer):
def getPhotonRecords(self, request: pas_pb2.PhotonRecordsRequest, _context):
raw_data_bytes = flb_tools.read_data_bytes(request.fileName)
data = flb_tools.reshape_flb_data(raw_data_bytes)
index = 0
chunk_size = 1024
len_data = len(data)
while index < len_data:
# last chunk
if index + chunk_size > len_data:
yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:])
# all other chunks
else:
yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:index + chunk_size])
index += chunk_size
最小重现 Github example
如果您将其更改为使用流应该会有帮助。为我转移不到2秒。请注意,这是在没有 ssl 的情况下在本地主机上进行的。我把这段代码放在一起。我做了 运行 它并且成功了。例如,不确定如果文件不是 4 字节的倍数会发生什么。此外,读取的字节顺序是 Java.
的默认顺序我这样制作了我的 10 兆文件。
dd if=/dev/random of=my_10mb_file bs=1024 count=10240
这是服务定义。我在此处添加的唯一内容是响应流。
service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}
这是服务器实现。
public class PhotonsServerImpl extends PASImplBase {
@Override
public void getPhotonRecords(PhotonRecordsRequest request, StreamObserver<PhotonRecordsReply> responseObserver) {
log.info("inside getPhotonRecords");
// open the file, I suggest using java.nio API for the fastest read times.
Path file = Paths.get(request.getFileName());
try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ)) {
int blockSize = 1024 * 4;
ByteBuffer byteBuffer = ByteBuffer.allocate(blockSize);
boolean done = false;
while (!done) {
PhotonRecordsReply.Builder response = PhotonRecordsReply.newBuilder();
// read 1000 ints from the file.
byteBuffer.clear();
int read = fileChannel.read(byteBuffer);
if (read < blockSize) {
done = true;
}
// write to the response.
byteBuffer.flip();
for (int index = 0; index < read / 4; index++) {
response.addPhotonRecords(byteBuffer.getInt());
}
// send the response
responseObserver.onNext(response.build());
}
} catch (Exception e) {
log.error("", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException());
}
responseObserver.onCompleted();
log.info("exit getPhotonRecords");
}
}
客户端只记录接收到的数组的大小。
public long getPhotonRecords(ManagedChannel channel) {
if (log.isInfoEnabled())
log.info("Enter - getPhotonRecords ");
PASGrpc.PASBlockingStub photonClient = PASGrpc.newBlockingStub(channel);
PhotonRecordsRequest request = PhotonRecordsRequest.newBuilder().setFileName("/udata/jdrummond/logs/my_10mb_file").build();
photonClient.getPhotonRecords(request).forEachRemaining(photonRecordsReply -> {
log.info("got this many photons: {}", photonRecordsReply.getPhotonRecordsCount());
});
return 0;
}