将 AWS S3 TransferManager 与 MultipartUpload 和 S3ProgressListener 用于 ResumableTransfer 时打开的进程太多
Too many open processes when using AWS S3 TransferManager with MultipartUpload and S3ProgressListener for ResumableTransfer
我们已经实现了 AWS TransferManager 与 MultipartUpload 和 ResumableTransfer 用于文件上传。
按照以下实施解决方案:
https://aws.amazon.com/blogs/developer/pausing-and-resuming-transfers-using-transfer-manager/
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-s3-transfermanager.html
https://aws.amazon.com/blogs/mobile/pause-and-resume-amazon-s3-transfers-using-the-aws-mobile-sdk-for-android/
在没有 MultipartUpload 和 ResumableTransfer 的情况下上传文件时,进程数得到了很好的控制,但是一旦我们实施了上述方法,进程数就开始呈指数增长。
下面的示例代码:
try {
AmazonS3 s3client = s3ClientFactory.createClient();
xferManager = TransferManagerBuilder.standard()
.withS3Client(s3client)
.withMinimumUploadPartSize(6291456L) //6 * 1024 * 1024(long) (represents 6MB)
.withMultipartUploadThreshold(6291456L) //6 * 1024 * 1024(long) (represents 6MB)
.withExecutorFactory(() -> Executors.newFixedThreadPool(3))
.build();
String resumableTargetFile ="/path/to/resumableTargetFile";
Upload upload = xferManager.upload(putRequest, new S3ProgressListener() {
ExecutorService executor = Executors.newFixedThreadPool(1);
@Override
public void progressChanged(ProgressEvent progressEvent) {
double pct = progressEvent.getBytesTransferred() * 100.0 / progressEvent.getBytes();
LOGGER.info("Upload status for file - " + fileName + " is: " + Double.toString(pct) + "%");
switch (progressEvent.getEventType()) {
case TRANSFER_STARTED_EVENT:
LOGGER.info("Started uploading file {} to S3", fileName);
break;
case TRANSFER_COMPLETED_EVENT:
LOGGER.info("Completed uploading file {} to S3", fileName);
break;
case TRANSFER_CANCELED_EVENT:
LOGGER.warn("Upload of file {} to S3 was aborted", fileName);
break;
case TRANSFER_FAILED_EVENT:
LOGGER.error("Failed uploading file {} to S3", fileName);
break;
default:
break;
}
}
@Override
public void onPersistableTransfer(final PersistableTransfer persistableTransfer) {
executor.submit(() -> {
saveTransferState(persistableTransfer, resumableTargetFile);
});
}
});
UploadResult uploadResult = upload.waitForUploadResult();
streamMD5 = uploadResult.getETag();
if (upload.isDone()) {
LOGGER.info("File {} uploaded successfully to S3 bucket {}",fileNameKey, bucketName);
}
} catch (AmazonServiceException ase) {
// The call was transmitted successfully, but Amazon S3 couldn't process
// it, so it returned an error response.
LOGGER.error("AmazonServiceException occurred: " + ase.getMessage());
} catch (SdkClientException sdce) {
// Amazon S3 couldn't be contacted for a response, or the client
// couldn't parse the response from Amazon S3.
LOGGER.error("SdkClientException occurred: " + sdce.getMessage());
} catch (AmazonClientException ace) {
LOGGER.error("AWS Exception occurred: " + ace.getMessage());
} catch (Exception e) {
LOGGER.error("Exception occurred during files processing: " + e.getMessage());
} finally {
xferManager.shutdownNow(true);
return streamMD5;
}
想看看是否有人遇到过类似的问题以及关于此问题的任何意见
尽管根据 AWS 文档,使用 TransferManager.shutdownNow(true) 关闭 TransferManager 应该会关闭 TransferManager 和相关的子对象,但我们发现在用于 ResumableTransfer 的 S3ProgressListener 中生成的 ExecutorService 从未关闭关闭 TransferManager 后。
一旦我们通过调用 executor.shutdown() 显式关闭执行程序,打开进程呈指数增长的问题就得到解决
我们已经实现了 AWS TransferManager 与 MultipartUpload 和 ResumableTransfer 用于文件上传。
按照以下实施解决方案:
https://aws.amazon.com/blogs/developer/pausing-and-resuming-transfers-using-transfer-manager/
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-s3-transfermanager.html
https://aws.amazon.com/blogs/mobile/pause-and-resume-amazon-s3-transfers-using-the-aws-mobile-sdk-for-android/
在没有 MultipartUpload 和 ResumableTransfer 的情况下上传文件时,进程数得到了很好的控制,但是一旦我们实施了上述方法,进程数就开始呈指数增长。
下面的示例代码:
try {
AmazonS3 s3client = s3ClientFactory.createClient();
xferManager = TransferManagerBuilder.standard()
.withS3Client(s3client)
.withMinimumUploadPartSize(6291456L) //6 * 1024 * 1024(long) (represents 6MB)
.withMultipartUploadThreshold(6291456L) //6 * 1024 * 1024(long) (represents 6MB)
.withExecutorFactory(() -> Executors.newFixedThreadPool(3))
.build();
String resumableTargetFile ="/path/to/resumableTargetFile";
Upload upload = xferManager.upload(putRequest, new S3ProgressListener() {
ExecutorService executor = Executors.newFixedThreadPool(1);
@Override
public void progressChanged(ProgressEvent progressEvent) {
double pct = progressEvent.getBytesTransferred() * 100.0 / progressEvent.getBytes();
LOGGER.info("Upload status for file - " + fileName + " is: " + Double.toString(pct) + "%");
switch (progressEvent.getEventType()) {
case TRANSFER_STARTED_EVENT:
LOGGER.info("Started uploading file {} to S3", fileName);
break;
case TRANSFER_COMPLETED_EVENT:
LOGGER.info("Completed uploading file {} to S3", fileName);
break;
case TRANSFER_CANCELED_EVENT:
LOGGER.warn("Upload of file {} to S3 was aborted", fileName);
break;
case TRANSFER_FAILED_EVENT:
LOGGER.error("Failed uploading file {} to S3", fileName);
break;
default:
break;
}
}
@Override
public void onPersistableTransfer(final PersistableTransfer persistableTransfer) {
executor.submit(() -> {
saveTransferState(persistableTransfer, resumableTargetFile);
});
}
});
UploadResult uploadResult = upload.waitForUploadResult();
streamMD5 = uploadResult.getETag();
if (upload.isDone()) {
LOGGER.info("File {} uploaded successfully to S3 bucket {}",fileNameKey, bucketName);
}
} catch (AmazonServiceException ase) {
// The call was transmitted successfully, but Amazon S3 couldn't process
// it, so it returned an error response.
LOGGER.error("AmazonServiceException occurred: " + ase.getMessage());
} catch (SdkClientException sdce) {
// Amazon S3 couldn't be contacted for a response, or the client
// couldn't parse the response from Amazon S3.
LOGGER.error("SdkClientException occurred: " + sdce.getMessage());
} catch (AmazonClientException ace) {
LOGGER.error("AWS Exception occurred: " + ace.getMessage());
} catch (Exception e) {
LOGGER.error("Exception occurred during files processing: " + e.getMessage());
} finally {
xferManager.shutdownNow(true);
return streamMD5;
}
想看看是否有人遇到过类似的问题以及关于此问题的任何意见
尽管根据 AWS 文档,使用 TransferManager.shutdownNow(true) 关闭 TransferManager 应该会关闭 TransferManager 和相关的子对象,但我们发现在用于 ResumableTransfer 的 S3ProgressListener 中生成的 ExecutorService 从未关闭关闭 TransferManager 后。
一旦我们通过调用 executor.shutdown() 显式关闭执行程序,打开进程呈指数增长的问题就得到解决