在 python 中使用 boto 将内容从一个路径递归复制到另一个 s3 存储桶
Recursively copying Content from one path to another of s3 buckets using boto in python
我无法在 python.
中找到使用 boto 将 s3 存储桶中的内容从一个存储桶递归复制到另一个存储桶的任何解决方案
假设一个 bucket B1 包含的键结构如下:
B1/x/*
我想将所有对象从 B/x/* 之类的键递归复制到 B/y/*
S3中没有"directory"。那些“/”分隔符只是对象名称的一部分,这就是为什么 boto 没有这样的功能。要么写脚本处理,要么用第三方工具。
AWS customerapps show s3browser 提供了这样的任意目录复制功能。典型的免费版本只产生两个线程来移动文件,付费版本允许您指定更多线程并且 运行 更快。
或者你直接写脚本,然后用s3.client.copy_object把文件复制到另一个名字,然后再删除。例如
import boto3
s3 = boto3.client("s3")
# list_objects_v2() give more info
more_objects=True
found_token = True
while more_objects :
if found_token :
response= s3.list_objects_v2(
Bucket="mybucket",
Prefix="B1/x/",
Delimiter="/")
else:
response= s3.list_objects_v2(
Bucket="mybucket",
ContinuationToken=found_token,
Prefix="B1/x/",
Delimiter="/")
# use copy_object or copy_from
for source in object_list["Contents"]:
raw_name = source["Key"].split("/")[-1]
new_name = "new_structure/{}".format(raw_name)
s3.copy_object(
....
)
# Now check there is more objects to list
if "NextContinuationToken" in response:
found_token = response["NextContinuationToken"]
more_objects = True
else:
more_objects = False
** 重要说明 **:list_object 仅 return 每个列表最多 1000 个键,MaxKey 不会更改限制。所以你必须使用 list_objects_v2 并检查 NextContinuationToken 是否被 returned,以确保它是更多的对象,重复它直到耗尽。
只是想在以前的答案的基础上继续努力:
s3 = boto3.client('s3')
def copyFolderFromS3(pathFrom, bucketTo, locationTo):
response = {}
response['status'] = 'failed'
getBucket = pathFrom.split('/')[2]
location = '/'.join(pathFrom.split('/')[3:])
if pathFrom.startswith('s3://'):
copy_source = { 'Bucket': getBucket, 'Key': location }
uploadKey = locationTo
recursiveCopyFolderToS3(copy_source,bucketTo,uploadKey)
def recursiveCopyFolderToS3(src,uplB,uplK):
more_objects=True
found_token = True
while more_objects:
if found_token:
response = s3.list_objects_v2(
Bucket=src['Bucket'],
Prefix=src['Key'],
Delimiter="/")
else:
response = s3.list_objects_v2(
Bucket=src['Bucket'],
ContinuationToken=found_token,
Prefix=src['Key'],
Delimiter="/")
for source in response["Contents"]:
raw_name = source["Key"].split("/")[-1]
raw_name = raw_name
new_name = os.path.join(uplK,raw_name)
if raw_name.endswith('_$folder$'):
src["Key"] = source["Key"].replace('_$folder$','/')
new_name = new_name.replace('_$folder$','')
recursiveCopyFolderToS3(src,uplB,new_name)
else:
src['Key'] = source["Key"]
s3.copy_object(CopySource=src,Bucket=uplB,Key=new_name)
if "NextContinuationToken" in response:
found_token = response["NextContinuationToken"]
more_objects = True
else:
more_objects = False
或者您也可以使用默认安装在 EC2/emr 机器上的简单 awscli。
import subprocess
cmd='aws s3 cp '+path+' '+uploadUrl+' --recursive'
p=subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE)
p.communicate()
我没有使用 boto3
,而是选择 aws-cli
和 sh
. See the aws s3 cp 文档来获取完整的参数列表,您可以将其作为 kwargs 包括在下面(根据我自己的代码修改) ) 可用于在 S3 存储桶和/或本地目标之间复制/复制:
import sh # also assumes aws-cli has been installed
def s3_cp(source, target, **kwargs):
"""
Copy data from source to target. Include flags as kwargs
such as recursive=True and include=xyz
"""
args = []
for flag_name, flag_value in kwargs.items():
if flag_value is not False: # i.e. --quiet=False means omit --quiet
args.append(f"--{flag_name}")
if flag_value is not True: # i.e. --quiet=True means --quiet
args.append(flag_value)
args += [source, target]
sh.aws("s3", "cp", *args)
桶到桶(根据 OP 的问题):
s3_cp("s3://B1/x/", "s3://B1/y/", quiet=True, recursive=True)
或存储到本地:
s3_cp("s3://B1/x/", "my-local-dir/", quiet=True, recursive=True)
我个人发现,与 boto3
相比,这种方法可以缩短传输时间(传输几 GB 超过 20k 的小文件),从几小时缩短到几分钟。也许在引擎盖下它正在做一些线程或只是打开几个连接 - 但这只是猜测。
警告:它不适用于 Windows。
相关:
我无法在 python.
中找到使用 boto 将 s3 存储桶中的内容从一个存储桶递归复制到另一个存储桶的任何解决方案假设一个 bucket B1 包含的键结构如下: B1/x/* 我想将所有对象从 B/x/* 之类的键递归复制到 B/y/*
S3中没有"directory"。那些“/”分隔符只是对象名称的一部分,这就是为什么 boto 没有这样的功能。要么写脚本处理,要么用第三方工具。
AWS customerapps show s3browser 提供了这样的任意目录复制功能。典型的免费版本只产生两个线程来移动文件,付费版本允许您指定更多线程并且 运行 更快。
或者你直接写脚本,然后用s3.client.copy_object把文件复制到另一个名字,然后再删除。例如
import boto3
s3 = boto3.client("s3")
# list_objects_v2() give more info
more_objects=True
found_token = True
while more_objects :
if found_token :
response= s3.list_objects_v2(
Bucket="mybucket",
Prefix="B1/x/",
Delimiter="/")
else:
response= s3.list_objects_v2(
Bucket="mybucket",
ContinuationToken=found_token,
Prefix="B1/x/",
Delimiter="/")
# use copy_object or copy_from
for source in object_list["Contents"]:
raw_name = source["Key"].split("/")[-1]
new_name = "new_structure/{}".format(raw_name)
s3.copy_object(
....
)
# Now check there is more objects to list
if "NextContinuationToken" in response:
found_token = response["NextContinuationToken"]
more_objects = True
else:
more_objects = False
** 重要说明 **:list_object 仅 return 每个列表最多 1000 个键,MaxKey 不会更改限制。所以你必须使用 list_objects_v2 并检查 NextContinuationToken 是否被 returned,以确保它是更多的对象,重复它直到耗尽。
只是想在以前的答案的基础上继续努力:
s3 = boto3.client('s3')
def copyFolderFromS3(pathFrom, bucketTo, locationTo):
response = {}
response['status'] = 'failed'
getBucket = pathFrom.split('/')[2]
location = '/'.join(pathFrom.split('/')[3:])
if pathFrom.startswith('s3://'):
copy_source = { 'Bucket': getBucket, 'Key': location }
uploadKey = locationTo
recursiveCopyFolderToS3(copy_source,bucketTo,uploadKey)
def recursiveCopyFolderToS3(src,uplB,uplK):
more_objects=True
found_token = True
while more_objects:
if found_token:
response = s3.list_objects_v2(
Bucket=src['Bucket'],
Prefix=src['Key'],
Delimiter="/")
else:
response = s3.list_objects_v2(
Bucket=src['Bucket'],
ContinuationToken=found_token,
Prefix=src['Key'],
Delimiter="/")
for source in response["Contents"]:
raw_name = source["Key"].split("/")[-1]
raw_name = raw_name
new_name = os.path.join(uplK,raw_name)
if raw_name.endswith('_$folder$'):
src["Key"] = source["Key"].replace('_$folder$','/')
new_name = new_name.replace('_$folder$','')
recursiveCopyFolderToS3(src,uplB,new_name)
else:
src['Key'] = source["Key"]
s3.copy_object(CopySource=src,Bucket=uplB,Key=new_name)
if "NextContinuationToken" in response:
found_token = response["NextContinuationToken"]
more_objects = True
else:
more_objects = False
或者您也可以使用默认安装在 EC2/emr 机器上的简单 awscli。
import subprocess
cmd='aws s3 cp '+path+' '+uploadUrl+' --recursive'
p=subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE)
p.communicate()
我没有使用 boto3
,而是选择 aws-cli
和 sh
. See the aws s3 cp 文档来获取完整的参数列表,您可以将其作为 kwargs 包括在下面(根据我自己的代码修改) ) 可用于在 S3 存储桶和/或本地目标之间复制/复制:
import sh # also assumes aws-cli has been installed
def s3_cp(source, target, **kwargs):
"""
Copy data from source to target. Include flags as kwargs
such as recursive=True and include=xyz
"""
args = []
for flag_name, flag_value in kwargs.items():
if flag_value is not False: # i.e. --quiet=False means omit --quiet
args.append(f"--{flag_name}")
if flag_value is not True: # i.e. --quiet=True means --quiet
args.append(flag_value)
args += [source, target]
sh.aws("s3", "cp", *args)
桶到桶(根据 OP 的问题):
s3_cp("s3://B1/x/", "s3://B1/y/", quiet=True, recursive=True)
或存储到本地:
s3_cp("s3://B1/x/", "my-local-dir/", quiet=True, recursive=True)
我个人发现,与 boto3
相比,这种方法可以缩短传输时间(传输几 GB 超过 20k 的小文件),从几小时缩短到几分钟。也许在引擎盖下它正在做一些线程或只是打开几个连接 - 但这只是猜测。
警告:它不适用于 Windows。
相关: