等待订阅方法将所有数据放入数组

Wait subscribe method to get all data into array

我有一个端点,它的目的是接收一个 csv 文件,对其名称进行一些更改,然后将其发送到一个方法,以将其包含在单个文件中的所有数据上传到 Google云为纯文本。

该文件可以有超过 100,000 条记录,所以在解析它时我必须将所有数据保存在一个变量中,然后将其保存在 Google 云端。今天我可以做到,但是我一直在覆盖同一个文件,因为我不知道如何在方法中指示等待订阅的完整过程,然后再上传文件,所以每次添加数据文件再次上传到阵列。

虽然方法符合思路,但我想提高性能,因为上传一个只有2mb的文件,100,000条记录大约需要15分钟。有什么想法吗?

private Storage storage;

private void uploadToGoogleCloudStorage(FilePart filePart, BlobInfo blobInfo) throws IOException {
    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
        filePart.content()
                .subscribe(dataBuffer -> {
                    byte[] bytes = new byte[dataBuffer.readableByteCount()];
                    dataBuffer.read(bytes);
                    DataBufferUtils.release(dataBuffer);
                    try {
                        bos.write(bytes);
                        storage.createFrom(blobInfo, new ByteArrayInputStream(bos.toByteArray()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });

    }
}

终于找到解决方案了。我更改了对地图的订阅,然后我从 flux 获得了最后的响应,并且我订阅了响应以使用存储接口将数据上传到 google 云存储(来自 google 的包使用他们的api)

private Storage storage;

private void uploadToGoogleCloudStorage(FilePart filePart, BlobInfo blobInfo) throws IOException {
    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
        filePart.content()
                .map(dataBuffer -> {
                    byte[] bytes = new byte[dataBuffer.readableByteCount()];
                    dataBuffer.read(bytes);
                    DataBufferUtils.release(dataBuffer);
                    try {
                        bos.write(bytes);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    return bos;
                }).last().subscribe(data -> {
                    try {
                        storage.createFrom(blobInfo, new ByteArrayInputStream(bos.toByteArray()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
    }

}