如何使用 java SDK 将多部分异步上传到 Amazon S3
How to upload multipart to Amazon S3 asynchronously using the java SDK
在我的 java 应用程序中,我需要将数据写入 S3,我事先不知道大小,而且大小通常很大,所以按照 AWS S3 文档中的建议,我正在使用 Using the Java AWS SDKs (low-level-level API) 将数据写入 s3 存储桶。
在我的应用程序中,我提供了 S3BufferedOutputStream
这是一个实现 OutputStream
,应用程序中的其他 类 可以使用此流写入 s3 存储桶。
我将数据存储在缓冲区中并循环,一旦数据大于存储桶大小,我就将数据作为单个数据上传到缓冲区中 UploadPartRequest
这里是S3BufferedOutputStream
的write方法的实现
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.assertOpen();
int o = off, l = len;
int size;
while (l > (size = this.buf.length - position)) {
System.arraycopy(b, o, this.buf, this.position, size);
this.position += size;
flushBufferAndRewind();
o += size;
l -= size;
}
System.arraycopy(b, o, this.buf, this.position, l);
this.position += l;
}
整个实现类似于:code repo
我的问题是每个 UploadPartRequest 都是同步完成的,所以我们必须等待一个部分上传才能上传下一个部分。而且因为我使用的是 AWS S3 低级别 API 我无法从 TransferManager
提供的并行上传中受益
有没有办法使用底层SDK实现并行上传?
或者可以进行一些代码更改以异步操作而不破坏上传的数据并保持数据的顺序?
您应该考虑使用适用于 Java V2 的 AWS SDK。您引用的是 V1,而不是最新的 Amazon S3 Java API。如果您不熟悉 V2,请从这里开始:
Get started with the AWS SDK for Java 2.x
要通过 Amazon S3 Java API 执行异步操作,您需要使用 S3AsyncClient。
现在要了解如何使用此客户端上传对象,请参阅此 code example:
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
// snippet-end:[s3.java2.async_ops.import]
// snippet-start:[s3.java2.async_ops.main]
/**
* To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials.
*
* For information, see this documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
public class S3AsyncOps {
public static void main(String[] args) {
final String USAGE = "\n" +
"Usage:\n" +
" S3AsyncOps <bucketName> <key> <path>\n\n" +
"Where:\n" +
" bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" +
" key - the name of the object (for example, book.pdf). \n" +
" path - the local path to the file (for example, C:/AWS/book.pdf). \n" ;
if (args.length != 3) {
System.out.println(USAGE);
System.exit(1);
}
String bucketName = args[0];
String key = args[1];
String path = args[2];
Region region = Region.US_WEST_2;
S3AsyncClient client = S3AsyncClient.builder()
.region(region)
.build();
PutObjectRequest objectRequest = PutObjectRequest.builder()
.bucket(bucketName)
.key(key)
.build();
// Put the object into the bucket
CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest,
AsyncRequestBody.fromFile(Paths.get(path))
);
future.whenComplete((resp, err) -> {
try {
if (resp != null) {
System.out.println("Object uploaded. Details: " + resp);
} else {
// Handle error
err.printStackTrace();
}
} finally {
// Only close the client when you are completely done with it
client.close();
}
});
future.join();
}
}
即使用 S3AsyncClient 客户端上传对象。进行分块上传,需要使用这个方法:
要查看使用 S3 同步客户端的分段上传示例,请参阅:
这就是您的解决方案 - 使用 S3AsyncClient 对象的 createMultipartUpload 方法。
这是我拥有的 class 中的一些示例代码。它将部件提交给 ExecutorService
并保留返回的 Future
。这是为 v1 Java SDK 编写的;如果您使用的是 v2 SDK,则可以使用异步客户端而不是显式线程池:
// WARNING: data must not be updated by caller; make a defensive copy if needed
public synchronized void uploadPart(byte[] data, boolean isLastPart)
{
partNumber++;
logger.debug("submitting part {} for s3://{}/{}", partNumber, bucket, key);
final UploadPartRequest request = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withPartSize(data.length)
.withInputStream(new ByteArrayInputStream(data))
.withLastPart(isLastPart);
futures.add(
executor.submit(new Callable<PartETag>()
{
@Override
public PartETag call() throws Exception
{
int localPartNumber = request.getPartNumber();
logger.debug("uploading part {} for s3://{}/{}", localPartNumber, bucket, key);
UploadPartResult response = client.uploadPart(request);
String etag = response.getETag();
logger.debug("uploaded part {} for s3://{}/{}; etag is {}", localPartNumber, bucket, key, etag);
return new PartETag(localPartNumber, etag);
}
}));
}
注意:此方法synchronized
保证提交的部分不会乱序
提交所有部分后,使用此方法等待它们完成,然后完成上传:
public void complete()
{
logger.debug("waiting for upload tasks of s3://{}/{}", bucket, key);
List<PartETag> partTags = new ArrayList<>();
for (Future<PartETag> future : futures)
{
try
{
partTags.add(future.get());
}
catch (Exception e)
{
throw new RuntimeException(String.format("failed to complete upload task for s3://%s/%s"), e);
}
}
logger.debug("completing multi-part upload for s3://{}/{}", bucket, key);
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartETags(partTags);
client.completeMultipartUpload(request);
logger.debug("completed multi-part upload for s3://{}/{}", bucket, key);
}
您还需要一个 abort()
方法来取消未完成的部分并中止上传。这个和 class 的其余部分留作 reader.
的练习
在我的 java 应用程序中,我需要将数据写入 S3,我事先不知道大小,而且大小通常很大,所以按照 AWS S3 文档中的建议,我正在使用 Using the Java AWS SDKs (low-level-level API) 将数据写入 s3 存储桶。
在我的应用程序中,我提供了 S3BufferedOutputStream
这是一个实现 OutputStream
,应用程序中的其他 类 可以使用此流写入 s3 存储桶。
我将数据存储在缓冲区中并循环,一旦数据大于存储桶大小,我就将数据作为单个数据上传到缓冲区中 UploadPartRequest
这里是S3BufferedOutputStream
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.assertOpen();
int o = off, l = len;
int size;
while (l > (size = this.buf.length - position)) {
System.arraycopy(b, o, this.buf, this.position, size);
this.position += size;
flushBufferAndRewind();
o += size;
l -= size;
}
System.arraycopy(b, o, this.buf, this.position, l);
this.position += l;
}
整个实现类似于:code repo
我的问题是每个 UploadPartRequest 都是同步完成的,所以我们必须等待一个部分上传才能上传下一个部分。而且因为我使用的是 AWS S3 低级别 API 我无法从 TransferManager
提供的并行上传中受益有没有办法使用底层SDK实现并行上传? 或者可以进行一些代码更改以异步操作而不破坏上传的数据并保持数据的顺序?
您应该考虑使用适用于 Java V2 的 AWS SDK。您引用的是 V1,而不是最新的 Amazon S3 Java API。如果您不熟悉 V2,请从这里开始:
Get started with the AWS SDK for Java 2.x
要通过 Amazon S3 Java API 执行异步操作,您需要使用 S3AsyncClient。
现在要了解如何使用此客户端上传对象,请参阅此 code example:
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
// snippet-end:[s3.java2.async_ops.import]
// snippet-start:[s3.java2.async_ops.main]
/**
* To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials.
*
* For information, see this documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
public class S3AsyncOps {
public static void main(String[] args) {
final String USAGE = "\n" +
"Usage:\n" +
" S3AsyncOps <bucketName> <key> <path>\n\n" +
"Where:\n" +
" bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" +
" key - the name of the object (for example, book.pdf). \n" +
" path - the local path to the file (for example, C:/AWS/book.pdf). \n" ;
if (args.length != 3) {
System.out.println(USAGE);
System.exit(1);
}
String bucketName = args[0];
String key = args[1];
String path = args[2];
Region region = Region.US_WEST_2;
S3AsyncClient client = S3AsyncClient.builder()
.region(region)
.build();
PutObjectRequest objectRequest = PutObjectRequest.builder()
.bucket(bucketName)
.key(key)
.build();
// Put the object into the bucket
CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest,
AsyncRequestBody.fromFile(Paths.get(path))
);
future.whenComplete((resp, err) -> {
try {
if (resp != null) {
System.out.println("Object uploaded. Details: " + resp);
} else {
// Handle error
err.printStackTrace();
}
} finally {
// Only close the client when you are completely done with it
client.close();
}
});
future.join();
}
}
即使用 S3AsyncClient 客户端上传对象。进行分块上传,需要使用这个方法:
要查看使用 S3 同步客户端的分段上传示例,请参阅:
这就是您的解决方案 - 使用 S3AsyncClient 对象的 createMultipartUpload 方法。
这是我拥有的 class 中的一些示例代码。它将部件提交给 ExecutorService
并保留返回的 Future
。这是为 v1 Java SDK 编写的;如果您使用的是 v2 SDK,则可以使用异步客户端而不是显式线程池:
// WARNING: data must not be updated by caller; make a defensive copy if needed
public synchronized void uploadPart(byte[] data, boolean isLastPart)
{
partNumber++;
logger.debug("submitting part {} for s3://{}/{}", partNumber, bucket, key);
final UploadPartRequest request = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withPartSize(data.length)
.withInputStream(new ByteArrayInputStream(data))
.withLastPart(isLastPart);
futures.add(
executor.submit(new Callable<PartETag>()
{
@Override
public PartETag call() throws Exception
{
int localPartNumber = request.getPartNumber();
logger.debug("uploading part {} for s3://{}/{}", localPartNumber, bucket, key);
UploadPartResult response = client.uploadPart(request);
String etag = response.getETag();
logger.debug("uploaded part {} for s3://{}/{}; etag is {}", localPartNumber, bucket, key, etag);
return new PartETag(localPartNumber, etag);
}
}));
}
注意:此方法synchronized
保证提交的部分不会乱序
提交所有部分后,使用此方法等待它们完成,然后完成上传:
public void complete()
{
logger.debug("waiting for upload tasks of s3://{}/{}", bucket, key);
List<PartETag> partTags = new ArrayList<>();
for (Future<PartETag> future : futures)
{
try
{
partTags.add(future.get());
}
catch (Exception e)
{
throw new RuntimeException(String.format("failed to complete upload task for s3://%s/%s"), e);
}
}
logger.debug("completing multi-part upload for s3://{}/{}", bucket, key);
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartETags(partTags);
client.completeMultipartUpload(request);
logger.debug("completed multi-part upload for s3://{}/{}", bucket, key);
}
您还需要一个 abort()
方法来取消未完成的部分并中止上传。这个和 class 的其余部分留作 reader.