从 boto3 检索 S3 存储桶中的子文件夹名称
Retrieving subfolders names in S3 bucket from boto3
使用 boto3,我可以访问我的 AWS S3 存储桶:
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket-name')
现在,存储桶包含文件夹 first-level
,该文件夹本身包含几个以时间戳命名的子文件夹,例如 1456753904534
。
我需要知道我正在做的另一项工作的这些子文件夹的名称,我想知道我是否可以让 boto3 为我检索这些子文件夹。
所以我尝试了:
objs = bucket.meta.client.list_objects(Bucket='my-bucket-name')
它给出了一个字典,它的键 'Contents' 给了我所有的三级文件而不是二级时间戳目录,实际上我得到了一个包含
的列表
{u'ETag': '"etag"', u'Key': first-level/1456753904534/part-00014', u'LastModified':
datetime.datetime(2016, 2, 29, 13, 52, 24, tzinfo=tzutc()),
u'Owner': {u'DisplayName': 'owner', u'ID':
'id'},
u'Size': size, u'StorageClass': 'storageclass'}
你可以看到特定的文件,在本例中是 part-00014
被检索到,而我想单独获取目录的名称。
原则上我可以从所有路径中删除目录名称,但是在第三级检索所有内容以获得第二级是丑陋且昂贵的!
我也尝试了一些报道 here:
for o in bucket.objects.filter(Delimiter='/'):
print(o.key)
但我没有得到所需级别的文件夹。
有办法解决吗?
首先,S3中没有真正的文件夹概念。
您绝对可以有一个文件 @ '/folder/subfolder/myfile.txt'
而没有文件夹或子文件夹。
要 "simulate" S3 中的文件夹,您必须创建一个空文件并在其名称末尾添加“/”(参见 Amazon S3 boto - how to create a folder?)
对于您的问题,您可能应该使用带有 2 个参数的方法 get_all_keys
:prefix
和 delimiter
https://github.com/boto/boto/blob/develop/boto/s3/bucket.py#L427
for key in bucket.get_all_keys(prefix='first-level/', delimiter='/'):
print(key.name)
S3是对象存储,没有真正的目录结构。 “/”是相当装饰性的。
人们希望拥有目录结构的一个原因是,因为他们可以 maintain/prune/add 一棵树来应用。对于 S3,您将此类结构视为某种索引或搜索标签。
要在 S3 中操作对象,您需要 boto3.client 或 boto3.resource,例如
列出所有对象
import boto3
s3 = boto3.client("s3")
all_objects = s3.list_objects(Bucket = 'bucket-name')
http://boto3.readthedocs.org/en/latest/reference/services/s3.html#S3.Client.list_objects
事实上,如果s3对象名是使用'/'分隔符存储的。 list_objects 的更新版本 (list_objects_v2) 允许您限制对以指定前缀开头的键的响应。
要将项目限制为某些子文件夹下的项目:
import boto3
s3 = boto3.client("s3")
response = s3.list_objects_v2(
Bucket=BUCKET,
Prefix ='DIR1/DIR2',
MaxKeys=100 )
另一种选择是使用python os.path 函数提取文件夹前缀。问题是这将需要列出不需要的目录中的对象。
import os
s3_key = 'first-level/1456753904534/part-00014'
filename = os.path.basename(s3_key)
foldername = os.path.dirname(s3_key)
# if you are not using conventional delimiter like '#'
s3_key = 'first-level#1456753904534#part-00014'
filename = s3_key.split("#")[-1]
关于 boto3 的提醒:boto3.resource 是一个不错的高级别 API。使用 boto3.client 与 boto3.resource 各有利弊。如果您开发内部共享库,使用 boto3.resource 将为您提供一个覆盖所用资源的黑盒层。
下面的代码 returns 只有来自 s3 存储桶的 'folder' 中的 'subfolders'。
import boto3
bucket = 'my-bucket'
#Make sure you provide / in the end
prefix = 'prefix-name-with-slash/'
client = boto3.client('s3')
result = client.list_objects(Bucket=bucket, Prefix=prefix, Delimiter='/')
for o in result.get('CommonPrefixes'):
print 'sub folder : ', o.get('Prefix')
我花了很多时间才弄清楚,但最后这里有一个使用 boto3 列出 S3 存储桶中子文件夹内容的简单方法。希望对你有帮助
prefix = "folderone/foldertwo/"
s3 = boto3.resource('s3')
bucket = s3.Bucket(name="bucket_name_here")
FilesNotFound = True
for obj in bucket.objects.filter(Prefix=prefix):
print('{0}:{1}'.format(bucket.name, obj.key))
FilesNotFound = False
if FilesNotFound:
print("ALERT", "No file in {0}/{1}".format(bucket, prefix))
当您 运行 aws s3 ls s3://my-bucket/
时,AWS cli 会执行此操作(大概不会获取和遍历存储桶中的所有键),所以我认为必须有一种使用 boto3 的方法。
看起来他们确实使用了 Prefix 和 Delimiter - 我能够编写一个函数,通过稍微修改该代码来获取存储桶根级别的所有目录:
def list_folders_in_bucket(bucket):
paginator = boto3.client('s3').get_paginator('list_objects')
folders = []
iterator = paginator.paginate(Bucket=bucket, Prefix='', Delimiter='/', PaginationConfig={'PageSize': None})
for response_data in iterator:
prefixes = response_data.get('CommonPrefixes', [])
for prefix in prefixes:
prefix_name = prefix['Prefix']
if prefix_name.endswith('/'):
folders.append(prefix_name.rstrip('/'))
return folders
以下对我有用... S3 对象:
s3://bucket/
form1/
section11/
file111
file112
section12/
file121
form2/
section21/
file211
file112
section22/
file221
file222
...
...
...
使用:
from boto3.session import Session
s3client = session.client('s3')
resp = s3client.list_objects(Bucket=bucket, Prefix='', Delimiter="/")
forms = [x['Prefix'] for x in resp['CommonPrefixes']]
我们得到:
form1/
form2/
...
有:
resp = s3client.list_objects(Bucket=bucket, Prefix='form1/', Delimiter="/")
sections = [x['Prefix'] for x in resp['CommonPrefixes']]
我们得到:
form1/section11/
form1/section12/
我遇到了同样的问题,但使用 boto3.client
和 list_objects_v2
以及 Bucket
和 StartAfter
参数设法解决了它。
s3client = boto3.client('s3')
bucket = 'my-bucket-name'
startAfter = 'firstlevelFolder/secondLevelFolder'
theobjects = s3client.list_objects_v2(Bucket=bucket, StartAfter=startAfter )
for object in theobjects['Contents']:
print object['Key']
以上代码的输出结果将显示如下:
firstlevelFolder/secondLevelFolder/item1
firstlevelFolder/secondLevelFolder/item2
Boto3 list_objects_v2 Documentation
为了仅去除 secondLevelFolder
的目录名称,我使用了 python 方法 split()
:
s3client = boto3.client('s3')
bucket = 'my-bucket-name'
startAfter = 'firstlevelFolder/secondLevelFolder'
theobjects = s3client.list_objects_v2(Bucket=bucket, StartAfter=startAfter )
for object in theobjects['Contents']:
direcoryName = object['Key'].encode("string_escape").split('/')
print direcoryName[1]
以上代码的输出结果将显示如下:
secondLevelFolder
secondLevelFolder
如果您想获取目录名称和内容项名称,请将打印行替换为以下内容:
print "{}/{}".format(fileName[1], fileName[2])
然后输出如下:
secondLevelFolder/item2
secondLevelFolder/item2
希望对您有所帮助
S3 的最大实现是没有 folders/directories 键。 明显的文件夹结构只是在文件名 前面加上 'Key',因此要列出 myBucket
的 some/path/to/the/file/
的内容,您可以尝试:
s3 = boto3.client('s3')
for obj in s3.list_objects_v2(Bucket="myBucket", Prefix="some/path/to/the/file/")['Contents']:
print(obj['Key'])
这会给你类似的东西:
some/path/to/the/file/yo.jpg
some/path/to/the/file/meAndYou.gif
...
简答:
使用Delimiter='/'
。这避免了对您的存储桶进行递归列表。这里的一些答案错误地建议做一个完整的列表并使用一些字符串操作来检索目录名称。这可能非常低效。请记住,S3 实际上对一个存储桶可以包含的对象数量没有限制。因此,想象一下,在 bar/
和 foo/
之间,您有一万亿个对象:您将等待很长时间才能获得 ['bar/', 'foo/']
.
使用Paginators
。出于同样的原因(S3 是工程师对无穷大的近似),您 必须 列出页面并避免将所有列表存储在内存中。相反,将您的“lister”视为迭代器,并处理它产生的流。
使用boto3.client
, not boto3.resource
。 resource
版本似乎不能很好地处理 Delimiter
选项。如果你有一个资源,比如 bucket = boto3.resource('s3').Bucket(name)
,你可以得到相应的客户端:bucket.meta.client
.
长答案:
以下是我用于简单存储桶(无版本处理)的迭代器。
import os
import boto3
from collections import namedtuple
from operator import attrgetter
S3Obj = namedtuple('S3Obj', ['key', 'mtime', 'size', 'ETag'])
def s3list(bucket, path, start=None, end=None, recursive=True, list_dirs=True,
list_objs=True, limit=None):
"""
Iterator that lists a bucket's objects under path, (optionally) starting with
start and ending before end.
If recursive is False, then list only the "depth=0" items (dirs and objects).
If recursive is True, then list recursively all objects (no dirs).
Args:
bucket:
a boto3.resource('s3').Bucket().
path:
a directory in the bucket.
start:
optional: start key, inclusive (may be a relative path under path, or
absolute in the bucket)
end:
optional: stop key, exclusive (may be a relative path under path, or
absolute in the bucket)
recursive:
optional, default True. If True, lists only objects. If False, lists
only depth 0 "directories" and objects.
list_dirs:
optional, default True. Has no effect in recursive listing. On
non-recursive listing, if False, then directories are omitted.
list_objs:
optional, default True. If False, then directories are omitted.
limit:
optional. If specified, then lists at most this many items.
Returns:
an iterator of S3Obj.
Examples:
# set up
>>> s3 = boto3.resource('s3')
... bucket = s3.Bucket('bucket-name')
# iterate through all S3 objects under some dir
>>> for p in s3list(bucket, 'some/dir'):
... print(p)
# iterate through up to 20 S3 objects under some dir, starting with foo_0010
>>> for p in s3list(bucket, 'some/dir', limit=20, start='foo_0010'):
... print(p)
# non-recursive listing under some dir:
>>> for p in s3list(bucket, 'some/dir', recursive=False):
... print(p)
# non-recursive listing under some dir, listing only dirs:
>>> for p in s3list(bucket, 'some/dir', recursive=False, list_objs=False):
... print(p)
"""
kwargs = dict()
if start is not None:
if not start.startswith(path):
start = os.path.join(path, start)
# note: need to use a string just smaller than start, because
# the list_object API specifies that start is excluded (the first
# result is *after* start).
kwargs.update(Marker=__prev_str(start))
if end is not None:
if not end.startswith(path):
end = os.path.join(path, end)
if not recursive:
kwargs.update(Delimiter='/')
if not path.endswith('/'):
path += '/'
kwargs.update(Prefix=path)
if limit is not None:
kwargs.update(PaginationConfig={'MaxItems': limit})
paginator = bucket.meta.client.get_paginator('list_objects')
for resp in paginator.paginate(Bucket=bucket.name, **kwargs):
q = []
if 'CommonPrefixes' in resp and list_dirs:
q = [S3Obj(f['Prefix'], None, None, None) for f in resp['CommonPrefixes']]
if 'Contents' in resp and list_objs:
q += [S3Obj(f['Key'], f['LastModified'], f['Size'], f['ETag']) for f in resp['Contents']]
# note: even with sorted lists, it is faster to sort(a+b)
# than heapq.merge(a, b) at least up to 10K elements in each list
q = sorted(q, key=attrgetter('key'))
if limit is not None:
q = q[:limit]
limit -= len(q)
for p in q:
if end is not None and p.key >= end:
return
yield p
def __prev_str(s):
if len(s) == 0:
return s
s, c = s[:-1], ord(s[-1])
if c > 0:
s += chr(c - 1)
s += ''.join(['\u7FFF' for _ in range(10)])
return s
测试:
以下内容有助于测试 paginator
和 list_objects
的行为。它创建了许多目录和文件。由于页面最多有 1000 个条目,我们将其中的倍数用于目录和文件。 dirs
仅包含目录(每个目录都有一个对象)。 mixed
包含目录和对象的混合,每个目录有 2 个对象的比例(当然在目录下加上一个对象;S3 只存储对象)。
import concurrent
def genkeys(top='tmp/test', n=2000):
for k in range(n):
if k % 100 == 0:
print(k)
for name in [
os.path.join(top, 'dirs', f'{k:04d}_dir', 'foo'),
os.path.join(top, 'mixed', f'{k:04d}_dir', 'foo'),
os.path.join(top, 'mixed', f'{k:04d}_foo_a'),
os.path.join(top, 'mixed', f'{k:04d}_foo_b'),
]:
yield name
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
executor.map(lambda name: bucket.put_object(Key=name, Body='hi\n'.encode()), genkeys())
结果结构是:
./dirs/0000_dir/foo
./dirs/0001_dir/foo
./dirs/0002_dir/foo
...
./dirs/1999_dir/foo
./mixed/0000_dir/foo
./mixed/0000_foo_a
./mixed/0000_foo_b
./mixed/0001_dir/foo
./mixed/0001_foo_a
./mixed/0001_foo_b
./mixed/0002_dir/foo
./mixed/0002_foo_a
./mixed/0002_foo_b
...
./mixed/1999_dir/foo
./mixed/1999_foo_a
./mixed/1999_foo_b
稍微修改上面为 s3list
提供的代码以检查来自 paginator
的响应,您可以观察到一些有趣的事实:
Marker
真是独家。给定 Marker=topdir + 'mixed/0500_foo_a'
将使列表在 之后 该键开始(根据 AmazonS3 API),即 .../mixed/0500_foo_b
。这就是 __prev_str()
.
的原因
使用Delimiter
,当列出mixed/
时,来自paginator
的每个响应包含666个键和334个公共前缀。它非常擅长不建立大量响应。
相比之下,当列出 dirs/
时,来自 paginator
的每个响应都包含 1000 个公共前缀(没有键)。
通过PaginationConfig={'MaxItems': limit}
形式的限制只限制键的数量,不限制公共前缀。我们通过进一步截断迭代器的流来处理这个问题。
为什么不使用 s3path
包,它与使用 pathlib
一样方便?但是,如果您必须使用 boto3
:
使用boto3.resource
这建立在 之上以应用可选的 limit
。它显然比 boto3.client
版本简单得多。
import logging
from typing import List, Optional
import boto3
from boto3_type_annotations.s3 import ObjectSummary # pip install boto3_type_annotations
log = logging.getLogger(__name__)
_S3_RESOURCE = boto3.resource("s3")
def s3_list(bucket_name: str, prefix: str, *, limit: Optional[int] = None) -> List[ObjectSummary]:
"""Return a list of S3 object summaries."""
# Ref:
return list(_S3_RESOURCE.Bucket(bucket_name).objects.limit(count=limit).filter(Prefix=prefix))
if __name__ == "__main__":
s3_list("noaa-gefs-pds", "gefs.20190828/12/pgrb2a", limit=10_000)
使用boto3.client
这使用 list_objects_v2
并建立在 之上以允许检索超过 1000 个对象。
import logging
from typing import cast, List
import boto3
log = logging.getLogger(__name__)
_S3_CLIENT = boto3.client("s3")
def s3_list(bucket_name: str, prefix: str, *, limit: int = cast(int, float("inf"))) -> List[dict]:
"""Return a list of S3 object summaries."""
# Ref:
contents: List[dict] = []
continuation_token = None
if limit <= 0:
return contents
while True:
max_keys = min(1000, limit - len(contents))
request_kwargs = {"Bucket": bucket_name, "Prefix": prefix, "MaxKeys": max_keys}
if continuation_token:
log.info( # type: ignore
"Listing %s objects in s3://%s/%s using continuation token ending with %s with %s objects listed thus far.",
max_keys, bucket_name, prefix, continuation_token[-6:], len(contents)) # pylint: disable=unsubscriptable-object
response = _S3_CLIENT.list_objects_v2(**request_kwargs, ContinuationToken=continuation_token)
else:
log.info("Listing %s objects in s3://%s/%s with %s objects listed thus far.", max_keys, bucket_name, prefix, len(contents))
response = _S3_CLIENT.list_objects_v2(**request_kwargs)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
contents.extend(response["Contents"])
is_truncated = response["IsTruncated"]
if (not is_truncated) or (len(contents) >= limit):
break
continuation_token = response["NextContinuationToken"]
assert len(contents) <= limit
log.info("Returning %s objects from s3://%s/%s.", len(contents), bucket_name, prefix)
return contents
if __name__ == "__main__":
s3_list("noaa-gefs-pds", "gefs.20190828/12/pgrb2a", limit=10_000)
我知道 boto3 是这里讨论的主题,但我发现简单地使用 awscli 通常更快更直观 - awscli 保留了比 boto3 更多的功能。
例如,如果我在 "subfolders" 中保存了与给定存储桶关联的对象,我可以将它们全部列出来,例如:
1) 'mydata' = bucket name
2) 'f1/f2/f3' = "path" leading to "files" or objects
3) 'foo2.csv, barfar.segy, gar.tar' = all objects "inside" f3
所以,我们可以认为通向这些对象的"absolute path"是:
'mydata/f1/f2/f3/foo2.csv'...
使用 awscli 命令,我们可以通过以下方式轻松列出给定 "subfolder" 内的所有对象:
aws s3 ls s3://mydata/f1/f2/f3/ --recursive
如果您尝试获取大量 S3 存储桶对象,以下是可以处理分页的代码片段:
def get_matching_s3_objects(bucket, prefix="", suffix=""):
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
kwargs = {'Bucket': bucket}
# We can pass the prefix directly to the S3 API. If the user has passed
# a tuple or list of prefixes, we go through them one by one.
if isinstance(prefix, str):
prefixes = (prefix, )
else:
prefixes = prefix
for key_prefix in prefixes:
kwargs["Prefix"] = key_prefix
for page in paginator.paginate(**kwargs):
try:
contents = page["Contents"]
except KeyError:
return
for obj in contents:
key = obj["Key"]
if key.endswith(suffix):
yield obj
至于 Boto 1.13.3,它变得如此简单(如果您跳过所有分页注意事项,其他答案中已涵盖):
def get_sub_paths(bucket, prefix):
s3 = boto3.client('s3')
response = s3.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
MaxKeys=1000)
return [item["Prefix"] for item in response['CommonPrefixes']]
这是一个可能的解决方案:
def download_list_s3_folder(my_bucket,my_folder):
import boto3
s3 = boto3.client('s3')
response = s3.list_objects_v2(
Bucket=my_bucket,
Prefix=my_folder,
MaxKeys=1000)
return [item["Key"] for item in response['Contents']]
使用递归方法列出 S3 存储桶中的所有不同路径。
def common_prefix(bucket_name,paths,prefix=''):
client = boto3.client('s3')
paginator = client.get_paginator('list_objects')
result = paginator.paginate(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
for prefix in result.search('CommonPrefixes'):
if prefix == None:
break
paths.append(prefix.get('Prefix'))
common_prefix(bucket_name,paths,prefix.get('Prefix'))
这对我来说效果很好,只检索存储桶下的一级文件夹:
client = boto3.client('s3')
bucket = 'my-bucket-name'
folders = set()
for prefix in client.list_objects(Bucket=bucket, Delimiter='/')['CommonPrefixes']:
folders.add(prefix['Prefix'][:-1])
print(folders)
您可以对列表而不是集合执行相同的操作,因为文件夹名称是唯一的
要列出的“目录”并不是真正的对象,而是对象键的子字符串,因此它们不会出现在 objects.filter
方法中。您可以在此处使用客户端的 list_objects
并指定前缀。
import boto3
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket-name')
res = bucket.meta.client.list_objects(Bucket=bucket.name, Delimiter='/', Prefix = 'sub-folder/')
for o in res.get('CommonPrefixes'):
print(o.get('Prefix'))
这个问题的一些很好的答案。
我一直在使用 boto3 资源 objects.filter
方法来获取所有文件。
objects.filter
方法 returns 作为迭代器,速度非常快。
虽然转换成list比较费时间
list_objects_v2
returns 实际内容而不是迭代器。
但是您需要循环获取所有内容,因为它的大小限制为 1000。
为了只获取文件夹,我像这样应用列表理解
[x.split('/')[index] for x in files]
以下是各种方法所花费的时间。
运行 这些测试时的文件数为 125077。
%%timeit
s3 = boto3.resource('s3')
response = s3.Bucket('bucket').objects.filter(Prefix='foo/bar/')
3.95 ms ± 17.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%%timeit
s3 = boto3.resource('s3')
response = s3.Bucket('foo').objects.filter(Prefix='foo/bar/')
files = list(response)
26.6 s ± 1.08 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket='bucket', Prefix='foo/bar/')
files = response['Contents']
while 'NextContinuationToken' in response:
response = s3.list_objects_v2(Bucket='bucket', Prefix='foo/bar/', ContinuationToken=response['NextContinuationToken'])
files.extend(response['Contents'])
22.8 s ± 1.11 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
是的,如前所述,重要的是S3中没有真正的文件夹概念。但是让我们看看 S3 可以实现哪些技巧 API.
下面的例子是对@cem的回答的解决方案的改进。
除了@cem 的解决方案之外,此解决方案还使用了 S3 分页器 API。即使结果包含超过 1000 个对象,该解决方案也会收集所有结果。 S3 分页器 API 自动解析从 1001 到 2000 等的下一个结果。
在此示例中,列出了名为“lala”的特定“文件夹”下的所有“子文件夹”(键)(没有该子文件夹的递归结构)。
Prefix='lala/' 和 Delimiter="/" 参数发挥了作用。
# given "folder/key" structure
# .
# ├── lorem.txt
# ├─── lala
# │ ├── folder1
# │ │ ├── file1.txt
# │ │ └── file2.txt
# │ ├── folder2
# │ │ └── file1.txt
# │ └── folder3
# │ └── file1.txt
# └── lorem
# └── folder4
# ├── file1.txt
# └── file2.txt
import boto3
s3 = boto3.client('s3')
paginator = s3.get_paginator('list_objects_v2')
# Execute paginated list_objects_v2
response = paginator.paginate(Bucket='your-bucket-name', Prefix='lala/', Delimiter="/")
# Get prefix for each page result
names = []
for page in response:
names.extend([x['Prefix'] for x in page.get('CommonPrefixes', [])])
print(names)
# Result is:
# ['lala/folder1/','lala/folder2/','lala/folder3/']
使用 boto3,我可以访问我的 AWS S3 存储桶:
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket-name')
现在,存储桶包含文件夹 first-level
,该文件夹本身包含几个以时间戳命名的子文件夹,例如 1456753904534
。
我需要知道我正在做的另一项工作的这些子文件夹的名称,我想知道我是否可以让 boto3 为我检索这些子文件夹。
所以我尝试了:
objs = bucket.meta.client.list_objects(Bucket='my-bucket-name')
它给出了一个字典,它的键 'Contents' 给了我所有的三级文件而不是二级时间戳目录,实际上我得到了一个包含
的列表{u'ETag': '"etag"', u'Key': first-level/1456753904534/part-00014', u'LastModified': datetime.datetime(2016, 2, 29, 13, 52, 24, tzinfo=tzutc()),
u'Owner': {u'DisplayName': 'owner', u'ID': 'id'},
u'Size': size, u'StorageClass': 'storageclass'}
你可以看到特定的文件,在本例中是 part-00014
被检索到,而我想单独获取目录的名称。
原则上我可以从所有路径中删除目录名称,但是在第三级检索所有内容以获得第二级是丑陋且昂贵的!
我也尝试了一些报道 here:
for o in bucket.objects.filter(Delimiter='/'):
print(o.key)
但我没有得到所需级别的文件夹。
有办法解决吗?
首先,S3中没有真正的文件夹概念。
您绝对可以有一个文件 @ '/folder/subfolder/myfile.txt'
而没有文件夹或子文件夹。
要 "simulate" S3 中的文件夹,您必须创建一个空文件并在其名称末尾添加“/”(参见 Amazon S3 boto - how to create a folder?)
对于您的问题,您可能应该使用带有 2 个参数的方法 get_all_keys
:prefix
和 delimiter
https://github.com/boto/boto/blob/develop/boto/s3/bucket.py#L427
for key in bucket.get_all_keys(prefix='first-level/', delimiter='/'):
print(key.name)
S3是对象存储,没有真正的目录结构。 “/”是相当装饰性的。 人们希望拥有目录结构的一个原因是,因为他们可以 maintain/prune/add 一棵树来应用。对于 S3,您将此类结构视为某种索引或搜索标签。
要在 S3 中操作对象,您需要 boto3.client 或 boto3.resource,例如 列出所有对象
import boto3
s3 = boto3.client("s3")
all_objects = s3.list_objects(Bucket = 'bucket-name')
http://boto3.readthedocs.org/en/latest/reference/services/s3.html#S3.Client.list_objects
事实上,如果s3对象名是使用'/'分隔符存储的。 list_objects 的更新版本 (list_objects_v2) 允许您限制对以指定前缀开头的键的响应。
要将项目限制为某些子文件夹下的项目:
import boto3
s3 = boto3.client("s3")
response = s3.list_objects_v2(
Bucket=BUCKET,
Prefix ='DIR1/DIR2',
MaxKeys=100 )
另一种选择是使用python os.path 函数提取文件夹前缀。问题是这将需要列出不需要的目录中的对象。
import os
s3_key = 'first-level/1456753904534/part-00014'
filename = os.path.basename(s3_key)
foldername = os.path.dirname(s3_key)
# if you are not using conventional delimiter like '#'
s3_key = 'first-level#1456753904534#part-00014'
filename = s3_key.split("#")[-1]
关于 boto3 的提醒:boto3.resource 是一个不错的高级别 API。使用 boto3.client 与 boto3.resource 各有利弊。如果您开发内部共享库,使用 boto3.resource 将为您提供一个覆盖所用资源的黑盒层。
下面的代码 returns 只有来自 s3 存储桶的 'folder' 中的 'subfolders'。
import boto3
bucket = 'my-bucket'
#Make sure you provide / in the end
prefix = 'prefix-name-with-slash/'
client = boto3.client('s3')
result = client.list_objects(Bucket=bucket, Prefix=prefix, Delimiter='/')
for o in result.get('CommonPrefixes'):
print 'sub folder : ', o.get('Prefix')
我花了很多时间才弄清楚,但最后这里有一个使用 boto3 列出 S3 存储桶中子文件夹内容的简单方法。希望对你有帮助
prefix = "folderone/foldertwo/"
s3 = boto3.resource('s3')
bucket = s3.Bucket(name="bucket_name_here")
FilesNotFound = True
for obj in bucket.objects.filter(Prefix=prefix):
print('{0}:{1}'.format(bucket.name, obj.key))
FilesNotFound = False
if FilesNotFound:
print("ALERT", "No file in {0}/{1}".format(bucket, prefix))
当您 运行 aws s3 ls s3://my-bucket/
时,AWS cli 会执行此操作(大概不会获取和遍历存储桶中的所有键),所以我认为必须有一种使用 boto3 的方法。
看起来他们确实使用了 Prefix 和 Delimiter - 我能够编写一个函数,通过稍微修改该代码来获取存储桶根级别的所有目录:
def list_folders_in_bucket(bucket):
paginator = boto3.client('s3').get_paginator('list_objects')
folders = []
iterator = paginator.paginate(Bucket=bucket, Prefix='', Delimiter='/', PaginationConfig={'PageSize': None})
for response_data in iterator:
prefixes = response_data.get('CommonPrefixes', [])
for prefix in prefixes:
prefix_name = prefix['Prefix']
if prefix_name.endswith('/'):
folders.append(prefix_name.rstrip('/'))
return folders
以下对我有用... S3 对象:
s3://bucket/
form1/
section11/
file111
file112
section12/
file121
form2/
section21/
file211
file112
section22/
file221
file222
...
...
...
使用:
from boto3.session import Session
s3client = session.client('s3')
resp = s3client.list_objects(Bucket=bucket, Prefix='', Delimiter="/")
forms = [x['Prefix'] for x in resp['CommonPrefixes']]
我们得到:
form1/
form2/
...
有:
resp = s3client.list_objects(Bucket=bucket, Prefix='form1/', Delimiter="/")
sections = [x['Prefix'] for x in resp['CommonPrefixes']]
我们得到:
form1/section11/
form1/section12/
我遇到了同样的问题,但使用 boto3.client
和 list_objects_v2
以及 Bucket
和 StartAfter
参数设法解决了它。
s3client = boto3.client('s3')
bucket = 'my-bucket-name'
startAfter = 'firstlevelFolder/secondLevelFolder'
theobjects = s3client.list_objects_v2(Bucket=bucket, StartAfter=startAfter )
for object in theobjects['Contents']:
print object['Key']
以上代码的输出结果将显示如下:
firstlevelFolder/secondLevelFolder/item1
firstlevelFolder/secondLevelFolder/item2
Boto3 list_objects_v2 Documentation
为了仅去除 secondLevelFolder
的目录名称,我使用了 python 方法 split()
:
s3client = boto3.client('s3')
bucket = 'my-bucket-name'
startAfter = 'firstlevelFolder/secondLevelFolder'
theobjects = s3client.list_objects_v2(Bucket=bucket, StartAfter=startAfter )
for object in theobjects['Contents']:
direcoryName = object['Key'].encode("string_escape").split('/')
print direcoryName[1]
以上代码的输出结果将显示如下:
secondLevelFolder
secondLevelFolder
如果您想获取目录名称和内容项名称,请将打印行替换为以下内容:
print "{}/{}".format(fileName[1], fileName[2])
然后输出如下:
secondLevelFolder/item2
secondLevelFolder/item2
希望对您有所帮助
S3 的最大实现是没有 folders/directories 键。 明显的文件夹结构只是在文件名 前面加上 'Key',因此要列出 myBucket
的 some/path/to/the/file/
的内容,您可以尝试:
s3 = boto3.client('s3')
for obj in s3.list_objects_v2(Bucket="myBucket", Prefix="some/path/to/the/file/")['Contents']:
print(obj['Key'])
这会给你类似的东西:
some/path/to/the/file/yo.jpg
some/path/to/the/file/meAndYou.gif
...
简答:
使用
Delimiter='/'
。这避免了对您的存储桶进行递归列表。这里的一些答案错误地建议做一个完整的列表并使用一些字符串操作来检索目录名称。这可能非常低效。请记住,S3 实际上对一个存储桶可以包含的对象数量没有限制。因此,想象一下,在bar/
和foo/
之间,您有一万亿个对象:您将等待很长时间才能获得['bar/', 'foo/']
.使用
Paginators
。出于同样的原因(S3 是工程师对无穷大的近似),您 必须 列出页面并避免将所有列表存储在内存中。相反,将您的“lister”视为迭代器,并处理它产生的流。使用
boto3.client
, notboto3.resource
。resource
版本似乎不能很好地处理Delimiter
选项。如果你有一个资源,比如bucket = boto3.resource('s3').Bucket(name)
,你可以得到相应的客户端:bucket.meta.client
.
长答案:
以下是我用于简单存储桶(无版本处理)的迭代器。
import os
import boto3
from collections import namedtuple
from operator import attrgetter
S3Obj = namedtuple('S3Obj', ['key', 'mtime', 'size', 'ETag'])
def s3list(bucket, path, start=None, end=None, recursive=True, list_dirs=True,
list_objs=True, limit=None):
"""
Iterator that lists a bucket's objects under path, (optionally) starting with
start and ending before end.
If recursive is False, then list only the "depth=0" items (dirs and objects).
If recursive is True, then list recursively all objects (no dirs).
Args:
bucket:
a boto3.resource('s3').Bucket().
path:
a directory in the bucket.
start:
optional: start key, inclusive (may be a relative path under path, or
absolute in the bucket)
end:
optional: stop key, exclusive (may be a relative path under path, or
absolute in the bucket)
recursive:
optional, default True. If True, lists only objects. If False, lists
only depth 0 "directories" and objects.
list_dirs:
optional, default True. Has no effect in recursive listing. On
non-recursive listing, if False, then directories are omitted.
list_objs:
optional, default True. If False, then directories are omitted.
limit:
optional. If specified, then lists at most this many items.
Returns:
an iterator of S3Obj.
Examples:
# set up
>>> s3 = boto3.resource('s3')
... bucket = s3.Bucket('bucket-name')
# iterate through all S3 objects under some dir
>>> for p in s3list(bucket, 'some/dir'):
... print(p)
# iterate through up to 20 S3 objects under some dir, starting with foo_0010
>>> for p in s3list(bucket, 'some/dir', limit=20, start='foo_0010'):
... print(p)
# non-recursive listing under some dir:
>>> for p in s3list(bucket, 'some/dir', recursive=False):
... print(p)
# non-recursive listing under some dir, listing only dirs:
>>> for p in s3list(bucket, 'some/dir', recursive=False, list_objs=False):
... print(p)
"""
kwargs = dict()
if start is not None:
if not start.startswith(path):
start = os.path.join(path, start)
# note: need to use a string just smaller than start, because
# the list_object API specifies that start is excluded (the first
# result is *after* start).
kwargs.update(Marker=__prev_str(start))
if end is not None:
if not end.startswith(path):
end = os.path.join(path, end)
if not recursive:
kwargs.update(Delimiter='/')
if not path.endswith('/'):
path += '/'
kwargs.update(Prefix=path)
if limit is not None:
kwargs.update(PaginationConfig={'MaxItems': limit})
paginator = bucket.meta.client.get_paginator('list_objects')
for resp in paginator.paginate(Bucket=bucket.name, **kwargs):
q = []
if 'CommonPrefixes' in resp and list_dirs:
q = [S3Obj(f['Prefix'], None, None, None) for f in resp['CommonPrefixes']]
if 'Contents' in resp and list_objs:
q += [S3Obj(f['Key'], f['LastModified'], f['Size'], f['ETag']) for f in resp['Contents']]
# note: even with sorted lists, it is faster to sort(a+b)
# than heapq.merge(a, b) at least up to 10K elements in each list
q = sorted(q, key=attrgetter('key'))
if limit is not None:
q = q[:limit]
limit -= len(q)
for p in q:
if end is not None and p.key >= end:
return
yield p
def __prev_str(s):
if len(s) == 0:
return s
s, c = s[:-1], ord(s[-1])
if c > 0:
s += chr(c - 1)
s += ''.join(['\u7FFF' for _ in range(10)])
return s
测试:
以下内容有助于测试 paginator
和 list_objects
的行为。它创建了许多目录和文件。由于页面最多有 1000 个条目,我们将其中的倍数用于目录和文件。 dirs
仅包含目录(每个目录都有一个对象)。 mixed
包含目录和对象的混合,每个目录有 2 个对象的比例(当然在目录下加上一个对象;S3 只存储对象)。
import concurrent
def genkeys(top='tmp/test', n=2000):
for k in range(n):
if k % 100 == 0:
print(k)
for name in [
os.path.join(top, 'dirs', f'{k:04d}_dir', 'foo'),
os.path.join(top, 'mixed', f'{k:04d}_dir', 'foo'),
os.path.join(top, 'mixed', f'{k:04d}_foo_a'),
os.path.join(top, 'mixed', f'{k:04d}_foo_b'),
]:
yield name
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
executor.map(lambda name: bucket.put_object(Key=name, Body='hi\n'.encode()), genkeys())
结果结构是:
./dirs/0000_dir/foo
./dirs/0001_dir/foo
./dirs/0002_dir/foo
...
./dirs/1999_dir/foo
./mixed/0000_dir/foo
./mixed/0000_foo_a
./mixed/0000_foo_b
./mixed/0001_dir/foo
./mixed/0001_foo_a
./mixed/0001_foo_b
./mixed/0002_dir/foo
./mixed/0002_foo_a
./mixed/0002_foo_b
...
./mixed/1999_dir/foo
./mixed/1999_foo_a
./mixed/1999_foo_b
稍微修改上面为 s3list
提供的代码以检查来自 paginator
的响应,您可以观察到一些有趣的事实:
的原因Marker
真是独家。给定Marker=topdir + 'mixed/0500_foo_a'
将使列表在 之后 该键开始(根据 AmazonS3 API),即.../mixed/0500_foo_b
。这就是__prev_str()
.使用
Delimiter
,当列出mixed/
时,来自paginator
的每个响应包含666个键和334个公共前缀。它非常擅长不建立大量响应。相比之下,当列出
dirs/
时,来自paginator
的每个响应都包含 1000 个公共前缀(没有键)。通过
PaginationConfig={'MaxItems': limit}
形式的限制只限制键的数量,不限制公共前缀。我们通过进一步截断迭代器的流来处理这个问题。
为什么不使用 s3path
包,它与使用 pathlib
一样方便?但是,如果您必须使用 boto3
:
使用boto3.resource
这建立在 limit
。它显然比 boto3.client
版本简单得多。
import logging
from typing import List, Optional
import boto3
from boto3_type_annotations.s3 import ObjectSummary # pip install boto3_type_annotations
log = logging.getLogger(__name__)
_S3_RESOURCE = boto3.resource("s3")
def s3_list(bucket_name: str, prefix: str, *, limit: Optional[int] = None) -> List[ObjectSummary]:
"""Return a list of S3 object summaries."""
# Ref:
return list(_S3_RESOURCE.Bucket(bucket_name).objects.limit(count=limit).filter(Prefix=prefix))
if __name__ == "__main__":
s3_list("noaa-gefs-pds", "gefs.20190828/12/pgrb2a", limit=10_000)
使用boto3.client
这使用 list_objects_v2
并建立在
import logging
from typing import cast, List
import boto3
log = logging.getLogger(__name__)
_S3_CLIENT = boto3.client("s3")
def s3_list(bucket_name: str, prefix: str, *, limit: int = cast(int, float("inf"))) -> List[dict]:
"""Return a list of S3 object summaries."""
# Ref:
contents: List[dict] = []
continuation_token = None
if limit <= 0:
return contents
while True:
max_keys = min(1000, limit - len(contents))
request_kwargs = {"Bucket": bucket_name, "Prefix": prefix, "MaxKeys": max_keys}
if continuation_token:
log.info( # type: ignore
"Listing %s objects in s3://%s/%s using continuation token ending with %s with %s objects listed thus far.",
max_keys, bucket_name, prefix, continuation_token[-6:], len(contents)) # pylint: disable=unsubscriptable-object
response = _S3_CLIENT.list_objects_v2(**request_kwargs, ContinuationToken=continuation_token)
else:
log.info("Listing %s objects in s3://%s/%s with %s objects listed thus far.", max_keys, bucket_name, prefix, len(contents))
response = _S3_CLIENT.list_objects_v2(**request_kwargs)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
contents.extend(response["Contents"])
is_truncated = response["IsTruncated"]
if (not is_truncated) or (len(contents) >= limit):
break
continuation_token = response["NextContinuationToken"]
assert len(contents) <= limit
log.info("Returning %s objects from s3://%s/%s.", len(contents), bucket_name, prefix)
return contents
if __name__ == "__main__":
s3_list("noaa-gefs-pds", "gefs.20190828/12/pgrb2a", limit=10_000)
我知道 boto3 是这里讨论的主题,但我发现简单地使用 awscli 通常更快更直观 - awscli 保留了比 boto3 更多的功能。
例如,如果我在 "subfolders" 中保存了与给定存储桶关联的对象,我可以将它们全部列出来,例如:
1) 'mydata' = bucket name
2) 'f1/f2/f3' = "path" leading to "files" or objects
3) 'foo2.csv, barfar.segy, gar.tar' = all objects "inside" f3
所以,我们可以认为通向这些对象的"absolute path"是: 'mydata/f1/f2/f3/foo2.csv'...
使用 awscli 命令,我们可以通过以下方式轻松列出给定 "subfolder" 内的所有对象:
aws s3 ls s3://mydata/f1/f2/f3/ --recursive
如果您尝试获取大量 S3 存储桶对象,以下是可以处理分页的代码片段:
def get_matching_s3_objects(bucket, prefix="", suffix=""):
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
kwargs = {'Bucket': bucket}
# We can pass the prefix directly to the S3 API. If the user has passed
# a tuple or list of prefixes, we go through them one by one.
if isinstance(prefix, str):
prefixes = (prefix, )
else:
prefixes = prefix
for key_prefix in prefixes:
kwargs["Prefix"] = key_prefix
for page in paginator.paginate(**kwargs):
try:
contents = page["Contents"]
except KeyError:
return
for obj in contents:
key = obj["Key"]
if key.endswith(suffix):
yield obj
至于 Boto 1.13.3,它变得如此简单(如果您跳过所有分页注意事项,其他答案中已涵盖):
def get_sub_paths(bucket, prefix):
s3 = boto3.client('s3')
response = s3.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
MaxKeys=1000)
return [item["Prefix"] for item in response['CommonPrefixes']]
这是一个可能的解决方案:
def download_list_s3_folder(my_bucket,my_folder):
import boto3
s3 = boto3.client('s3')
response = s3.list_objects_v2(
Bucket=my_bucket,
Prefix=my_folder,
MaxKeys=1000)
return [item["Key"] for item in response['Contents']]
使用递归方法列出 S3 存储桶中的所有不同路径。
def common_prefix(bucket_name,paths,prefix=''):
client = boto3.client('s3')
paginator = client.get_paginator('list_objects')
result = paginator.paginate(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
for prefix in result.search('CommonPrefixes'):
if prefix == None:
break
paths.append(prefix.get('Prefix'))
common_prefix(bucket_name,paths,prefix.get('Prefix'))
这对我来说效果很好,只检索存储桶下的一级文件夹:
client = boto3.client('s3')
bucket = 'my-bucket-name'
folders = set()
for prefix in client.list_objects(Bucket=bucket, Delimiter='/')['CommonPrefixes']:
folders.add(prefix['Prefix'][:-1])
print(folders)
您可以对列表而不是集合执行相同的操作,因为文件夹名称是唯一的
要列出的“目录”并不是真正的对象,而是对象键的子字符串,因此它们不会出现在 objects.filter
方法中。您可以在此处使用客户端的 list_objects
并指定前缀。
import boto3
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket-name')
res = bucket.meta.client.list_objects(Bucket=bucket.name, Delimiter='/', Prefix = 'sub-folder/')
for o in res.get('CommonPrefixes'):
print(o.get('Prefix'))
这个问题的一些很好的答案。
我一直在使用 boto3 资源 objects.filter
方法来获取所有文件。
objects.filter
方法 returns 作为迭代器,速度非常快。
虽然转换成list比较费时间
list_objects_v2
returns 实际内容而不是迭代器。
但是您需要循环获取所有内容,因为它的大小限制为 1000。
为了只获取文件夹,我像这样应用列表理解
[x.split('/')[index] for x in files]
以下是各种方法所花费的时间。
运行 这些测试时的文件数为 125077。
%%timeit
s3 = boto3.resource('s3')
response = s3.Bucket('bucket').objects.filter(Prefix='foo/bar/')
3.95 ms ± 17.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%%timeit
s3 = boto3.resource('s3')
response = s3.Bucket('foo').objects.filter(Prefix='foo/bar/')
files = list(response)
26.6 s ± 1.08 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket='bucket', Prefix='foo/bar/')
files = response['Contents']
while 'NextContinuationToken' in response:
response = s3.list_objects_v2(Bucket='bucket', Prefix='foo/bar/', ContinuationToken=response['NextContinuationToken'])
files.extend(response['Contents'])
22.8 s ± 1.11 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
是的,如前所述,重要的是S3中没有真正的文件夹概念。但是让我们看看 S3 可以实现哪些技巧 API.
下面的例子是对@cem的回答的解决方案的改进。
除了@cem 的解决方案之外,此解决方案还使用了 S3 分页器 API。即使结果包含超过 1000 个对象,该解决方案也会收集所有结果。 S3 分页器 API 自动解析从 1001 到 2000 等的下一个结果。
在此示例中,列出了名为“lala”的特定“文件夹”下的所有“子文件夹”(键)(没有该子文件夹的递归结构)。
Prefix='lala/' 和 Delimiter="/" 参数发挥了作用。
# given "folder/key" structure
# .
# ├── lorem.txt
# ├─── lala
# │ ├── folder1
# │ │ ├── file1.txt
# │ │ └── file2.txt
# │ ├── folder2
# │ │ └── file1.txt
# │ └── folder3
# │ └── file1.txt
# └── lorem
# └── folder4
# ├── file1.txt
# └── file2.txt
import boto3
s3 = boto3.client('s3')
paginator = s3.get_paginator('list_objects_v2')
# Execute paginated list_objects_v2
response = paginator.paginate(Bucket='your-bucket-name', Prefix='lala/', Delimiter="/")
# Get prefix for each page result
names = []
for page in response:
names.extend([x['Prefix'] for x in page.get('CommonPrefixes', [])])
print(names)
# Result is:
# ['lala/folder1/','lala/folder2/','lala/folder3/']