使用 streams[with zip4j] 生成 Big Zip 并将其上传到 s3

Generating Big Zip with streams[with zip4j] and uploading it to s3

我正在努力生成一个 zip 文件,该文件必须压缩大约 2000 个文档,总共约 1GB,然后将 zip 文件上传到 s3 存储桶中。

我正在使用 net.lingala.zip4j,这是一个非常好的 Java 处理 Zip 文件的库。 根据文档:https://github.com/srikanth-lingala/zip4j 我正在使用它的流处理部分。 该代码看起来与文档中的代码几乎相似:

public ByteArrayOutputStream compress(FileCompressingContext fileCompressingContext) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ZipOutputStream zos = new ZipOutputStream(baos)) {
    if (fileCompressingContext.getFiles() != null) {
        for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
            addFileToZip(zos, file.getContent(), file.getName());
        }
    }

    if (fileCompressingContext.getFolders() != null) {
        for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
            int i = 0;
            for (FileCompressingContext.File file : folder.getFiles()) {
                addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
            }
        }
    }
}

return baos;}

private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
    byte[] buff = new byte[4096];
    int readLen;

    ZipParameters zp = new ZipParameters();
    zp.setFileNameInZip(fileName);
    zos.putNextEntry(zp);
    try (InputStream is = new ByteArrayInputStream(file)) {
        while ((readLen = is.read(buff)) != -1) {
            zos.write(buff, 0, readLen);
        }
    }

    zos.closeEntry();
}

问题是 zos.closeEntry(); 在压缩 1000 个文档后抛出,java.lang.OutOfMemoryError:Java 堆 space:

java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[na:na]
at net.lingala.zip4j.io.outputstream.CountingOutputStream.write(CountingOutputStream.java:29) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipEntryOutputStream.write(ZipEntryOutputStream.java:33) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CipherOutputStream.write(CipherOutputStream.java:50) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CompressedOutputStream.write(CompressedOutputStream.java:26) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.deflate(DeflaterOutputStream.java:55) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.closeEntry(DeflaterOutputStream.java:63) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipOutputStream.closeEntry(ZipOutputStream.java:108) ~[zip4j-2.9.1.jar:na]

您认为有解决方案可以在生成 Zip 存档时将其递增地流式传输到 S3 吗!? 我的意思是像定期解析 ByteArrayOutputStream 并上传到 s3,然后重置 baos..

如果不能,还有哪些选择?写入磁盘,读取并上传到 s3?嗯,或者可能分批压缩?

出于好奇,我尝试过批量处理文档。就像在 100 个文档之后,写入 Zip,然后重新执行该过程。这里的问题是每 100 个文档将覆盖现有的 Zip。所以这又是行不通的。 我试着为每 100 个文档调用它:

new ZipFile("test.zip").addStream(new ByteArrayInputStream(baos_lisb.toByteArray()), zp);

,但正如我所说,它覆盖了 zip 内容,所以它没有追加。

在此先致谢

有趣的是,这是在本地机器上,而我在 zip 生成过程中遇到 OutOfMemoryError

在测试环境中,我在检索文件时出现OutOfMemoryError。所以 Hibernate 也在抱怨。这是先于一代人的一步。这可能是因为本地机器有 16GB 而测试环境只有 1GB。

因此解决方案是根据以下步骤构建的:

  1. 使用 Hibernate 批量检索文件,以及 (flush/clean) 事务性实体管理器,以强制 Hibernate 不将所有文件保存在内存中。批量大小为:50 个文档。
  2. 使用 Aws 分段上传调整 zip4j 压缩的代码,以便仅压缩和上传一批文件,然后重置缓冲区,以避免内存不足。

Step2 的设计和改编基于:https://www.bennadel.com/blog/3971-generate-and-incrementally-stream-a-zip-archive-to-amazon-s3-using-multipart-uploads-in-lucee-cfml-5-3-7-47.htm

所以最初问题的代码变成了如下:

    @Override
public void compressAndPublish(final FileCompressingContext fileCompressingContext) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ZipOutputStream zos = new ZipOutputStream(baos);
    if (fileCompressingContext.getFiles() != null) {
        for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
            addFileToZip(zos, file.getContent(), file.getName());
        }
    }

if (fileCompressingContext.getFolders() != null) {
    // 1. initialize multi part
    String uploadId = fileHandlerService.initialiseMultiPart(FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName("file.zip")
            .build());

    int partNumber = 0;
    int docNr = 0;
    List<CompletedPart> completedParts = new ArrayList<>();

    for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
        while (!folder.getDocPks().isEmpty()) {
            extractDocuments(folder, fileCompressingContext);

            for (FileCompressingContext.File file : folder.getFiles()) {
                if (baos.size() > PART_SIZE) {
                    log.debug("Id:{} - Preparing for update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
                    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                            .id(fileCompressingContext.getTaskId())
                            .root(bucket)
                            .fileName(file.zip)
                            .fileContent(baos.toByteArray())
                            .build();
                    // 2. upload parts of the zip
                    completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));

                    partNumber++;
                    baos.reset();
                }

                addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
                docNr++;
            }

            folder.getFiles().clear();
        }
    }

    finalizeZipContent(zos, baos);

    // 3. checks is there are any data remained under 5Mb
    if (baos.size() != 0) {
        log.debug("Id:{} - Preparing LAST update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);

        FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
                .id(fileCompressingContext.getTaskId())
                .root(bucket)
                .fileName(file.zip)
                .fileContent(baos.toByteArray())
                .build();
        completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
    }

    // 4. finish multipart operation
    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName(file.zip)
            .build();
    fileHandlerService.finishMultipartUpload(fileHandlerContext, uploadId, completedParts);

    log.debug("Id:{} - Multipart upload finished with partNr:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
} else {
    finalizeZipContent(zos, baos);

    FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
            .id(fileCompressingContext.getTaskId())
            .root(bucket)
            .fileName("file.zip")
            .fileContent(baos.toByteArray())
            .fileExtension("application/zip")
            .build();
    fileHandlerService.store(fileHandlerContext);
}

}

所以唯一改变的是与 aws multipart 的集成,它允许以数据块的形式上传大数据。以及每次上传后缓冲区的重置:baos.reset();

另外一个重要的步骤是这个方法:

private void finalizeZipContent(ZipOutputStream zos, ByteArrayOutputStream baos) throws IOException {
zos.flush();
zos.close();
baos.close();
}

,关闭 ZipOutputStream 和 ByteArrayOutputStream。如果最后没有完成这一步,zip 将看起来像损坏的。

还有方法addFileToZip(...)可以写得更简单:

private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
ZipParameters zp = new ZipParameters();
zp.setFileNameInZip(fileName);
zos.putNextEntry(zp);
zos.write(file);
zos.closeEntry();
    zos.flush();
}

,不需要定义数组的固定大小字节

真心希望这能对某人有所帮助并节省一些时间。干杯