如何异步使用 boto3 在 s3 中 move/copy 文件?
How do I move/copy files in s3 using boto3 asynchronously?
我了解使用 boto3 Object.copy_from(...) 使用线程但不是异步的。是否可以使此调用异步?如果没有,是否有另一种方法可以使用 boto3 来完成此操作?我发现移动 hundreds/thousands 个文件很好,但是当我处理 100 个文件时,它变得非常慢。
你可以看看aioboto3。它是一个第三方库,不是由 AWS 创建的,但它为选定的(不是全部)AWS API 调用提供 asyncio
支持。
我认为你可以使用 boto3 和 python 线程来处理这种情况,他们在 AWS S3 Docs 中提到了
Your application can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket.
因此您可以一次调用 3500 次上传,没有什么可以超越 AWS 设置的 3500 次限制。
通过使用线程,您只需调用 300 次(大约)。
最坏情况下需要 5 小时,即考虑到您的文件很大,上传一个文件平均需要 1 分钟。
注意: 运行线程越多,您机器上的资源就越多。您必须确保您的机器有足够的资源来支持您想要的最大并发请求数。
我使用以下。您可以从命令行复制到 python 文件和 运行 中。我有一台 8 核的 PC,所以它比我的带有 1 个 VPC 的小型 EC2 实例要快。
它使用 multiprocessing
库,因此如果您不熟悉,则需要继续阅读。它相对简单。有一个我已经注释掉的批量删除,因为你真的不想不小心删除错误的目录。您可以使用任何您想列出键或遍历对象的方法,但这对我有用。
from multiprocessing import Pool
from itertools import repeat
import boto3
import os
import math
s3sc = boto3.client('s3')
s3sr = boto3.resource('s3')
num_proc = os.cpu_count()
def get_list_of_keys_from_prefix(bucket, prefix):
"""gets list of keys for given bucket and prefix"""
keys_list = []
paginator = s3sr.meta.client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/'):
keys = [content['Key'] for content in page.get('Contents')]
keys_list.extend(keys)
if prefix in keys_list:
keys_list.remove(prefix)
return keys_list
def batch_delete_s3(keys_list, bucket):
total_keys = len(keys_list)
chunk_size = 1000
num_batches = math.ceil(total_keys / chunk_size)
for b in range(0, num_batches):
batch_to_delete = []
for k in keys_list[chunk_size*b:chunk_size*b+chunk_size]:
batch_to_delete.append({'Key': k})
s3sc.delete_objects(Bucket=bucket, Delete={'Objects': batch_to_delete,'Quiet': True})
def copy_s3_to_s3(from_bucket, from_key, to_bucket, to_key):
copy_source = {'Bucket': from_bucket, 'Key': from_key}
s3sr.meta.client.copy(copy_source, to_bucket, to_key)
def upload_multiprocess(from_bucket, keys_list_from, to_bucket, keys_list_to, num_proc=4):
with Pool(num_proc) as pool:
r = pool.starmap(copy_s3_to_s3, zip(repeat(from_bucket), keys_list_from, repeat(to_bucket), keys_list_to), 15)
pool.close()
pool.join()
return r
if __name__ == '__main__':
__spec__= None
from_bucket = 'from-bucket'
from_prefix = 'from/prefix/'
to_bucket = 'to-bucket'
to_prefix = 'to/prefix/'
keys_list_from = get_list_of_keys_from_prefix(from_bucket, from_prefix)
keys_list_to = [to_prefix + k.rsplit('/')[-1] for k in keys_list_from]
rs = upload_multiprocess(from_bucket, keys_list_from, to_bucket, keys_list_to, num_proc=num_proc)
# batch_delete_s3(keys_list_from, from_bucket)
我了解使用 boto3 Object.copy_from(...) 使用线程但不是异步的。是否可以使此调用异步?如果没有,是否有另一种方法可以使用 boto3 来完成此操作?我发现移动 hundreds/thousands 个文件很好,但是当我处理 100 个文件时,它变得非常慢。
你可以看看aioboto3。它是一个第三方库,不是由 AWS 创建的,但它为选定的(不是全部)AWS API 调用提供 asyncio
支持。
我认为你可以使用 boto3 和 python 线程来处理这种情况,他们在 AWS S3 Docs 中提到了
Your application can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket.
因此您可以一次调用 3500 次上传,没有什么可以超越 AWS 设置的 3500 次限制。
通过使用线程,您只需调用 300 次(大约)。
最坏情况下需要 5 小时,即考虑到您的文件很大,上传一个文件平均需要 1 分钟。
注意: 运行线程越多,您机器上的资源就越多。您必须确保您的机器有足够的资源来支持您想要的最大并发请求数。
我使用以下。您可以从命令行复制到 python 文件和 运行 中。我有一台 8 核的 PC,所以它比我的带有 1 个 VPC 的小型 EC2 实例要快。
它使用 multiprocessing
库,因此如果您不熟悉,则需要继续阅读。它相对简单。有一个我已经注释掉的批量删除,因为你真的不想不小心删除错误的目录。您可以使用任何您想列出键或遍历对象的方法,但这对我有用。
from multiprocessing import Pool
from itertools import repeat
import boto3
import os
import math
s3sc = boto3.client('s3')
s3sr = boto3.resource('s3')
num_proc = os.cpu_count()
def get_list_of_keys_from_prefix(bucket, prefix):
"""gets list of keys for given bucket and prefix"""
keys_list = []
paginator = s3sr.meta.client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/'):
keys = [content['Key'] for content in page.get('Contents')]
keys_list.extend(keys)
if prefix in keys_list:
keys_list.remove(prefix)
return keys_list
def batch_delete_s3(keys_list, bucket):
total_keys = len(keys_list)
chunk_size = 1000
num_batches = math.ceil(total_keys / chunk_size)
for b in range(0, num_batches):
batch_to_delete = []
for k in keys_list[chunk_size*b:chunk_size*b+chunk_size]:
batch_to_delete.append({'Key': k})
s3sc.delete_objects(Bucket=bucket, Delete={'Objects': batch_to_delete,'Quiet': True})
def copy_s3_to_s3(from_bucket, from_key, to_bucket, to_key):
copy_source = {'Bucket': from_bucket, 'Key': from_key}
s3sr.meta.client.copy(copy_source, to_bucket, to_key)
def upload_multiprocess(from_bucket, keys_list_from, to_bucket, keys_list_to, num_proc=4):
with Pool(num_proc) as pool:
r = pool.starmap(copy_s3_to_s3, zip(repeat(from_bucket), keys_list_from, repeat(to_bucket), keys_list_to), 15)
pool.close()
pool.join()
return r
if __name__ == '__main__':
__spec__= None
from_bucket = 'from-bucket'
from_prefix = 'from/prefix/'
to_bucket = 'to-bucket'
to_prefix = 'to/prefix/'
keys_list_from = get_list_of_keys_from_prefix(from_bucket, from_prefix)
keys_list_to = [to_prefix + k.rsplit('/')[-1] for k in keys_list_from]
rs = upload_multiprocess(from_bucket, keys_list_from, to_bucket, keys_list_to, num_proc=num_proc)
# batch_delete_s3(keys_list_from, from_bucket)