如何异步使用 boto3 在 s3 中 move/copy 文件?

How do I move/copy files in s3 using boto3 asynchronously?

我了解使用 boto3 Object.copy_from(...) 使用线程但不是异步的。是否可以使此调用异步?如果没有,是否有另一种方法可以使用 boto3 来完成此操作?我发现移动 hundreds/thousands 个文件很好,但是当我处理 100 个文件时,它变得非常慢。

你可以看看aioboto3。它是一个第三方库,不是由 AWS 创建的,但它为选定的(不是全部)AWS API 调用提供 asyncio 支持。

我认为你可以使用 boto3 和 python 线程来处理这种情况,他们在 AWS S3 Docs 中提到了

Your application can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket.

因此您可以一次调用 3500 次上传,没有什么可以超越 AWS 设置的 3500 次限制。

通过使用线程,您只需调用 300 次(大约)。

最坏情况下需要 5 小时,即考虑到您的文件很大,上传一个文件平均需要 1 分钟。

注意: 运行线程越多,您机器上的资源就越多。您必须确保您的机器有足够的资源来支持您想要的最大并发请求数。

我使用以下。您可以从命令行复制到 python 文件和 运行 中。我有一台 8 核的 PC,所以它比我的带有 1 个 VPC 的小型 EC2 实例要快。

它使用 multiprocessing 库,因此如果您不熟悉,则需要继续阅读。它相对简单。有一个我已经注释掉的批量删除,因为你真的不想不小心删除错误的目录。您可以使用任何您想列出键或遍历对象的方法,但这对我有用。

from multiprocessing import Pool
from itertools import repeat
import boto3
import os
import math
s3sc = boto3.client('s3')
s3sr = boto3.resource('s3')
num_proc = os.cpu_count()

def get_list_of_keys_from_prefix(bucket, prefix):
    """gets list of keys for given bucket and prefix"""
    keys_list = []
    paginator = s3sr.meta.client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/'):
        keys = [content['Key'] for content in page.get('Contents')]
        keys_list.extend(keys)
    if prefix in keys_list:
        keys_list.remove(prefix)
    return keys_list

def batch_delete_s3(keys_list, bucket):
    total_keys = len(keys_list)
    chunk_size = 1000
    num_batches = math.ceil(total_keys / chunk_size)
    for b in range(0, num_batches):
        batch_to_delete = []
        for k in keys_list[chunk_size*b:chunk_size*b+chunk_size]:
            batch_to_delete.append({'Key': k})
        s3sc.delete_objects(Bucket=bucket,  Delete={'Objects': batch_to_delete,'Quiet': True})

def copy_s3_to_s3(from_bucket, from_key, to_bucket, to_key):
    copy_source = {'Bucket': from_bucket, 'Key': from_key}
    s3sr.meta.client.copy(copy_source, to_bucket, to_key)

def upload_multiprocess(from_bucket, keys_list_from, to_bucket, keys_list_to, num_proc=4):
    with Pool(num_proc) as pool:
        r = pool.starmap(copy_s3_to_s3, zip(repeat(from_bucket), keys_list_from, repeat(to_bucket), keys_list_to), 15)
        pool.close()
        pool.join()
    return r

if __name__ == '__main__':
    __spec__= None

    from_bucket = 'from-bucket'
    from_prefix = 'from/prefix/'
    to_bucket = 'to-bucket'
    to_prefix = 'to/prefix/'

    keys_list_from = get_list_of_keys_from_prefix(from_bucket, from_prefix)
    keys_list_to = [to_prefix + k.rsplit('/')[-1] for k in keys_list_from]

    rs = upload_multiprocess(from_bucket, keys_list_from, to_bucket, keys_list_to, num_proc=num_proc)

    # batch_delete_s3(keys_list_from, from_bucket)