使用 streams[with zip4j] 生成 Big Zip 并将其上传到 s3
Generating Big Zip with streams[with zip4j] and uploading it to s3
我正在努力生成一个 zip 文件,该文件必须压缩大约 2000 个文档,总共约 1GB,然后将 zip 文件上传到 s3 存储桶中。
我正在使用 net.lingala.zip4j,这是一个非常好的 Java 处理 Zip 文件的库。
根据文档:https://github.com/srikanth-lingala/zip4j 我正在使用它的流处理部分。
该代码看起来与文档中的代码几乎相似:
public ByteArrayOutputStream compress(FileCompressingContext fileCompressingContext) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ZipOutputStream zos = new ZipOutputStream(baos)) {
if (fileCompressingContext.getFiles() != null) {
for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
addFileToZip(zos, file.getContent(), file.getName());
}
}
if (fileCompressingContext.getFolders() != null) {
for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
int i = 0;
for (FileCompressingContext.File file : folder.getFiles()) {
addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
}
}
}
}
return baos;}
private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
byte[] buff = new byte[4096];
int readLen;
ZipParameters zp = new ZipParameters();
zp.setFileNameInZip(fileName);
zos.putNextEntry(zp);
try (InputStream is = new ByteArrayInputStream(file)) {
while ((readLen = is.read(buff)) != -1) {
zos.write(buff, 0, readLen);
}
}
zos.closeEntry();
}
问题是 zos.closeEntry();
在压缩 1000 个文档后抛出,java.lang.OutOfMemoryError:Java 堆 space:
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[na:na]
at net.lingala.zip4j.io.outputstream.CountingOutputStream.write(CountingOutputStream.java:29) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipEntryOutputStream.write(ZipEntryOutputStream.java:33) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CipherOutputStream.write(CipherOutputStream.java:50) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CompressedOutputStream.write(CompressedOutputStream.java:26) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.deflate(DeflaterOutputStream.java:55) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.closeEntry(DeflaterOutputStream.java:63) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipOutputStream.closeEntry(ZipOutputStream.java:108) ~[zip4j-2.9.1.jar:na]
您认为有解决方案可以在生成 Zip 存档时将其递增地流式传输到 S3 吗!?
我的意思是像定期解析 ByteArrayOutputStream 并上传到 s3,然后重置 baos..
如果不能,还有哪些选择?写入磁盘,读取并上传到 s3?嗯,或者可能分批压缩?
出于好奇,我尝试过批量处理文档。就像在 100 个文档之后,写入 Zip,然后重新执行该过程。这里的问题是每 100 个文档将覆盖现有的 Zip。所以这又是行不通的。
我试着为每 100 个文档调用它:
new ZipFile("test.zip").addStream(new ByteArrayInputStream(baos_lisb.toByteArray()), zp);
,但正如我所说,它覆盖了 zip 内容,所以它没有追加。
在此先致谢
有趣的是,这是在本地机器上,而我在 zip 生成过程中遇到 OutOfMemoryError。
在测试环境中,我在检索文件时出现OutOfMemoryError。所以 Hibernate 也在抱怨。这是先于一代人的一步。这可能是因为本地机器有 16GB 而测试环境只有 1GB。
因此解决方案是根据以下步骤构建的:
- 使用 Hibernate 批量检索文件,以及 (flush/clean)
事务性实体管理器,以强制 Hibernate 不将所有文件保存在内存中。批量大小为:50 个文档。
- 使用 Aws 分段上传调整 zip4j 压缩的代码,以便仅压缩和上传一批文件,然后重置缓冲区,以避免内存不足。
所以最初问题的代码变成了如下:
@Override
public void compressAndPublish(final FileCompressingContext fileCompressingContext) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipOutputStream zos = new ZipOutputStream(baos);
if (fileCompressingContext.getFiles() != null) {
for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
addFileToZip(zos, file.getContent(), file.getName());
}
}
if (fileCompressingContext.getFolders() != null) {
// 1. initialize multi part
String uploadId = fileHandlerService.initialiseMultiPart(FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName("file.zip")
.build());
int partNumber = 0;
int docNr = 0;
List<CompletedPart> completedParts = new ArrayList<>();
for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
while (!folder.getDocPks().isEmpty()) {
extractDocuments(folder, fileCompressingContext);
for (FileCompressingContext.File file : folder.getFiles()) {
if (baos.size() > PART_SIZE) {
log.debug("Id:{} - Preparing for update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName(file.zip)
.fileContent(baos.toByteArray())
.build();
// 2. upload parts of the zip
completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
partNumber++;
baos.reset();
}
addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
docNr++;
}
folder.getFiles().clear();
}
}
finalizeZipContent(zos, baos);
// 3. checks is there are any data remained under 5Mb
if (baos.size() != 0) {
log.debug("Id:{} - Preparing LAST update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName(file.zip)
.fileContent(baos.toByteArray())
.build();
completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
}
// 4. finish multipart operation
FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName(file.zip)
.build();
fileHandlerService.finishMultipartUpload(fileHandlerContext, uploadId, completedParts);
log.debug("Id:{} - Multipart upload finished with partNr:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
} else {
finalizeZipContent(zos, baos);
FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName("file.zip")
.fileContent(baos.toByteArray())
.fileExtension("application/zip")
.build();
fileHandlerService.store(fileHandlerContext);
}
}
所以唯一改变的是与 aws multipart 的集成,它允许以数据块的形式上传大数据。以及每次上传后缓冲区的重置:baos.reset();
另外一个重要的步骤是这个方法:
private void finalizeZipContent(ZipOutputStream zos, ByteArrayOutputStream baos) throws IOException {
zos.flush();
zos.close();
baos.close();
}
,关闭 ZipOutputStream 和 ByteArrayOutputStream。如果最后没有完成这一步,zip 将看起来像损坏的。
还有方法addFileToZip(...)
可以写得更简单:
private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
ZipParameters zp = new ZipParameters();
zp.setFileNameInZip(fileName);
zos.putNextEntry(zp);
zos.write(file);
zos.closeEntry();
zos.flush();
}
,不需要定义数组的固定大小字节
真心希望这能对某人有所帮助并节省一些时间。干杯
我正在努力生成一个 zip 文件,该文件必须压缩大约 2000 个文档,总共约 1GB,然后将 zip 文件上传到 s3 存储桶中。
我正在使用 net.lingala.zip4j,这是一个非常好的 Java 处理 Zip 文件的库。 根据文档:https://github.com/srikanth-lingala/zip4j 我正在使用它的流处理部分。 该代码看起来与文档中的代码几乎相似:
public ByteArrayOutputStream compress(FileCompressingContext fileCompressingContext) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ZipOutputStream zos = new ZipOutputStream(baos)) {
if (fileCompressingContext.getFiles() != null) {
for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
addFileToZip(zos, file.getContent(), file.getName());
}
}
if (fileCompressingContext.getFolders() != null) {
for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
int i = 0;
for (FileCompressingContext.File file : folder.getFiles()) {
addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
}
}
}
}
return baos;}
private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
byte[] buff = new byte[4096];
int readLen;
ZipParameters zp = new ZipParameters();
zp.setFileNameInZip(fileName);
zos.putNextEntry(zp);
try (InputStream is = new ByteArrayInputStream(file)) {
while ((readLen = is.read(buff)) != -1) {
zos.write(buff, 0, readLen);
}
}
zos.closeEntry();
}
问题是 zos.closeEntry();
在压缩 1000 个文档后抛出,java.lang.OutOfMemoryError:Java 堆 space:
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[na:na]
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[na:na]
at net.lingala.zip4j.io.outputstream.CountingOutputStream.write(CountingOutputStream.java:29) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipEntryOutputStream.write(ZipEntryOutputStream.java:33) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CipherOutputStream.write(CipherOutputStream.java:50) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.CompressedOutputStream.write(CompressedOutputStream.java:26) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.deflate(DeflaterOutputStream.java:55) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.DeflaterOutputStream.closeEntry(DeflaterOutputStream.java:63) ~[zip4j-2.9.1.jar:na]
at net.lingala.zip4j.io.outputstream.ZipOutputStream.closeEntry(ZipOutputStream.java:108) ~[zip4j-2.9.1.jar:na]
您认为有解决方案可以在生成 Zip 存档时将其递增地流式传输到 S3 吗!? 我的意思是像定期解析 ByteArrayOutputStream 并上传到 s3,然后重置 baos..
如果不能,还有哪些选择?写入磁盘,读取并上传到 s3?嗯,或者可能分批压缩?
出于好奇,我尝试过批量处理文档。就像在 100 个文档之后,写入 Zip,然后重新执行该过程。这里的问题是每 100 个文档将覆盖现有的 Zip。所以这又是行不通的。 我试着为每 100 个文档调用它:
new ZipFile("test.zip").addStream(new ByteArrayInputStream(baos_lisb.toByteArray()), zp);
,但正如我所说,它覆盖了 zip 内容,所以它没有追加。
在此先致谢
有趣的是,这是在本地机器上,而我在 zip 生成过程中遇到 OutOfMemoryError。
在测试环境中,我在检索文件时出现OutOfMemoryError。所以 Hibernate 也在抱怨。这是先于一代人的一步。这可能是因为本地机器有 16GB 而测试环境只有 1GB。
因此解决方案是根据以下步骤构建的:
- 使用 Hibernate 批量检索文件,以及 (flush/clean) 事务性实体管理器,以强制 Hibernate 不将所有文件保存在内存中。批量大小为:50 个文档。
- 使用 Aws 分段上传调整 zip4j 压缩的代码,以便仅压缩和上传一批文件,然后重置缓冲区,以避免内存不足。
所以最初问题的代码变成了如下:
@Override
public void compressAndPublish(final FileCompressingContext fileCompressingContext) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipOutputStream zos = new ZipOutputStream(baos);
if (fileCompressingContext.getFiles() != null) {
for (FileCompressingContext.File file : fileCompressingContext.getFiles()) {
addFileToZip(zos, file.getContent(), file.getName());
}
}
if (fileCompressingContext.getFolders() != null) {
// 1. initialize multi part
String uploadId = fileHandlerService.initialiseMultiPart(FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName("file.zip")
.build());
int partNumber = 0;
int docNr = 0;
List<CompletedPart> completedParts = new ArrayList<>();
for (FileCompressingContext.Folder folder : fileCompressingContext.getFolders()) {
while (!folder.getDocPks().isEmpty()) {
extractDocuments(folder, fileCompressingContext);
for (FileCompressingContext.File file : folder.getFiles()) {
if (baos.size() > PART_SIZE) {
log.debug("Id:{} - Preparing for update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName(file.zip)
.fileContent(baos.toByteArray())
.build();
// 2. upload parts of the zip
completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
partNumber++;
baos.reset();
}
addFileToZip(zos, file.getContent(), folder.getName() + "/" + file.getName());
docNr++;
}
folder.getFiles().clear();
}
}
finalizeZipContent(zos, baos);
// 3. checks is there are any data remained under 5Mb
if (baos.size() != 0) {
log.debug("Id:{} - Preparing LAST update part:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName(file.zip)
.fileContent(baos.toByteArray())
.build();
completedParts.add(fileHandlerService.uploadPart(fileHandlerContext, uploadId, partNumber));
}
// 4. finish multipart operation
FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName(file.zip)
.build();
fileHandlerService.finishMultipartUpload(fileHandlerContext, uploadId, completedParts);
log.debug("Id:{} - Multipart upload finished with partNr:{}, docNr:{}", fileCompressingContext.getTaskId(), partNumber, docNr);
} else {
finalizeZipContent(zos, baos);
FileHandlerContext fileHandlerContext = FileHandlerContext.builder()
.id(fileCompressingContext.getTaskId())
.root(bucket)
.fileName("file.zip")
.fileContent(baos.toByteArray())
.fileExtension("application/zip")
.build();
fileHandlerService.store(fileHandlerContext);
}
}
所以唯一改变的是与 aws multipart 的集成,它允许以数据块的形式上传大数据。以及每次上传后缓冲区的重置:baos.reset();
另外一个重要的步骤是这个方法:
private void finalizeZipContent(ZipOutputStream zos, ByteArrayOutputStream baos) throws IOException {
zos.flush();
zos.close();
baos.close();
}
,关闭 ZipOutputStream 和 ByteArrayOutputStream。如果最后没有完成这一步,zip 将看起来像损坏的。
还有方法addFileToZip(...)
可以写得更简单:
private void addFileToZip(ZipOutputStream zos, byte[] file, String fileName) throws IOException {
ZipParameters zp = new ZipParameters();
zp.setFileNameInZip(fileName);
zos.putNextEntry(zp);
zos.write(file);
zos.closeEntry();
zos.flush();
}
,不需要定义数组的固定大小字节
真心希望这能对某人有所帮助并节省一些时间。干杯