处理从 .tar.gz 文件将 TarArchiveEntry 流式传输到 S3 存储桶

Handling Streaming TarArchiveEntry to S3 Bucket from a .tar.gz file

我正在使用 aws Lamda 解压缩和遍历 tar.gz 文件,然后将它们上传回 s3 并保留原始目录结构。

我 运行 遇到了通过 PutObjectRequest 将 TarArchiveEntry 流式传输到 S3 存储桶的问题。当第一个条目成功流式传输时,尝试在 TarArchiveInputStream 上获取 NextTarEntry() 时会抛出一个空指针,因为底层 GunzipCompress 充气器为空,它在 s3.putObject(new PutObjectRequest(... )) 称呼。

我无法找到有关 gz 输入流 inflator 属性在部分发送到 s3 后如何/为何设置为 null 的文档。 编辑 进一步调查显示 AWS 调用似乎在完成指定内容长度的上传后关闭输入流...还没有找到如何防止这种行为.

下面基本上是我的代码的样子。预先感谢您的帮助、意见和建议。

public String handleRequest(S3Event s3Event, Context context) {

    try {
        S3Event.S3EventNotificationRecord s3EventRecord = s3Event.getRecords().get(0);
        String s3Bucket = s3EventRecord.getS3().getBucket().getName();

        // Object key may have spaces or unicode non-ASCII characters.
        String srcKey = s3EventRecord.getS3().getObject().getKey();

        System.out.println("Received valid request from bucket: " + bucketName + " with srckey: " + srcKeyInput);

        String bucketFolder = srcKeyInput.substring(0, srcKeyInput.lastIndexOf('/') + 1);
        System.out.println("File parent directory: " + bucketFolder);

        final AmazonS3 s3Client = AmazonS3ClientBuilder.defaultClient();

        TarArchiveInputStream tarInput = new TarArchiveInputStream(new GzipCompressorInputStream(getObjectContent(s3Client, bucketName, srcKeyInput)));

        TarArchiveEntry currentEntry = tarInput.getNextTarEntry();

        while (currentEntry != null) {
            String fileName = currentEntry.getName();
            System.out.println("For path = " + fileName);

            // checking if looking at a file (vs a directory)
            if (currentEntry.isFile()) {

                System.out.println("Copying " + fileName + " to " + bucketFolder + fileName + " in bucket " + bucketName);
                ObjectMetadata metadata = new ObjectMetadata();
                metadata.setContentLength(currentEntry.getSize());

                s3Client.putObject(new PutObjectRequest(bucketName, bucketFolder + fileName, tarInput, metadata)); // contents are properly and successfully sent to s3
                System.out.println("Done!");
            }

            currentEntry = tarInput.getNextTarEntry(); // NPE here due underlying gz inflator is null;
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        IOUtils.closeQuietly(tarInput);
    }
}

没错,AWS 关闭了提供给 PutObjectRequestInputStream,我不知道有什么方法可以指示 AWS 不这样做。

但是,你可以用 CloseShieldInputStream from Commons IO 包裹 TarArchiveInputStream,像这样:

InputStream shieldedInput = new CloseShieldInputStream(tarInput);

s3Client.putObject(new PutObjectRequest(bucketName, bucketFolder + fileName, shieldedInput, metadata));

当 AWS 关闭所提供的 CloseShieldInputStream 时,基础 TarArchiveInputStream 将保持打开状态。


PS。我不知道 ByteArrayInputStream(tarInput.getCurrentEntry()) 是做什么的,但它看起来很奇怪。为了这个答案,我忽略了它。