将 AWS S3 TransferManager 与 MultipartUpload 和 S3ProgressListener 用于 ResumableTransfer 时打开的进程太多

Too many open processes when using AWS S3 TransferManager with MultipartUpload and S3ProgressListener for ResumableTransfer

我们已经实现了 AWS TransferManager 与 MultipartUpload 和 ResumableTransfer 用于文件上传。

按照以下实施解决方案:
https://aws.amazon.com/blogs/developer/pausing-and-resuming-transfers-using-transfer-manager/ https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-s3-transfermanager.html https://aws.amazon.com/blogs/mobile/pause-and-resume-amazon-s3-transfers-using-the-aws-mobile-sdk-for-android/

在没有 MultipartUpload 和 ResumableTransfer 的情况下上传文件时,进程数得到了很好的控制,但是一旦我们实施了上述方法,进程数就开始呈指数增长。

下面的示例代码:

  try {
        AmazonS3 s3client = s3ClientFactory.createClient();
        xferManager = TransferManagerBuilder.standard()
                .withS3Client(s3client)
                .withMinimumUploadPartSize(6291456L) //6 * 1024 * 1024(long) (represents 6MB)
                .withMultipartUploadThreshold(6291456L) //6 * 1024 * 1024(long) (represents 6MB)
                .withExecutorFactory(() -> Executors.newFixedThreadPool(3))
                .build();
        String resumableTargetFile ="/path/to/resumableTargetFile";

        Upload upload = xferManager.upload(putRequest, new S3ProgressListener() {
            ExecutorService executor = Executors.newFixedThreadPool(1);

            @Override
            public void progressChanged(ProgressEvent progressEvent) {
                double pct = progressEvent.getBytesTransferred() * 100.0 / progressEvent.getBytes();
                LOGGER.info("Upload status for file - " + fileName + " is: " + Double.toString(pct) + "%");

                switch (progressEvent.getEventType()) {
                    case TRANSFER_STARTED_EVENT:
                        LOGGER.info("Started uploading file {} to S3", fileName);
                        break;
                    case TRANSFER_COMPLETED_EVENT:
                        LOGGER.info("Completed uploading file {} to S3", fileName);
                        break;
                    case TRANSFER_CANCELED_EVENT:
                        LOGGER.warn("Upload of file {} to S3 was aborted", fileName);
                        break;
                    case TRANSFER_FAILED_EVENT:
                        LOGGER.error("Failed uploading file {} to S3", fileName);
                        break;
                    default:
                        break;
                }
            }

            @Override
            public void onPersistableTransfer(final PersistableTransfer persistableTransfer) {
                executor.submit(() -> {
                    saveTransferState(persistableTransfer, resumableTargetFile);
                });
            }
        });

        UploadResult uploadResult = upload.waitForUploadResult();
        streamMD5 = uploadResult.getETag();

        if (upload.isDone()) {
            LOGGER.info("File {} uploaded successfully to S3 bucket {}",fileNameKey, bucketName);
        }
} catch (AmazonServiceException ase) {
    // The call was transmitted successfully, but Amazon S3 couldn't process
    // it, so it returned an error response.
    LOGGER.error("AmazonServiceException occurred: " + ase.getMessage());
} catch (SdkClientException sdce) {
    // Amazon S3 couldn't be contacted for a response, or the client
    // couldn't parse the response from Amazon S3.
    LOGGER.error("SdkClientException occurred: " + sdce.getMessage());
} catch (AmazonClientException ace) {
    LOGGER.error("AWS Exception occurred: " + ace.getMessage());
} catch (Exception e) {
    LOGGER.error("Exception occurred during files processing: " + e.getMessage());
} finally {
    xferManager.shutdownNow(true);
    return streamMD5;
}

想看看是否有人遇到过类似的问题以及关于此问题的任何意见

尽管根据 AWS 文档,使用 TransferManager.shutdownNow(true) 关闭 TransferManager 应该会关闭 TransferManager 和相关的子对象,但我们发现在用于 ResumableTransfer 的 S3ProgressListener 中生成的 ExecutorService 从未关闭关闭 TransferManager 后。

一旦我们通过调用 executor.shutdown() 显式关闭执行程序,打开进程呈指数增长的问题就得到解决