如何找到 运行 此功能所需的所有必要 IAM 权限?
How to find out all the necessary IAM permission I need to run this function?
我有 'written' 一个函数,用于将对象从 S3 存储桶上的一个位置复制到同一存储桶中的另一个位置。 (实际上它主要是从 s3cmd 中提取的代码)。
import sys
import time
from copy import copy
import logging
from S3.Exceptions import *
from S3.S3 import S3
from S3.Config import Config
from S3.FileDict import FileDict
from S3.S3Uri import S3Uri
from S3.Utils import *
from S3.FileLists import *
from S3.ExitCodes import EX_OK
LOG = logging.getLogger()
# Unfortunately the s3cmd implementation uses a global instance of config object
# everywhere
cfg = None
def init_s3_cfg(access_key, secret_key):
global cfg
cfg = Config(access_key=access_key, secret_key=secret_key)
def cmd_sync_remote2remote(str_from_path, str_destination_base):
'''
This function is adopted from s3cmd project https://github.com/s3tools/s3cmd
because boto does not support recursive copy out of the box
:param str_from_path:
:param str_destination_base:
:return:
'''
s3 = S3(cfg)
LOG.info(s3.config.bucket_location)
# Normalise s3://uri (e.g. assert trailing slash)
from_path = S3Uri(str_from_path).uri()
destination_base = S3Uri(str_destination_base).uri()
LOG.info("from %s to %s" % (from_path, destination_base))
src_list, src_exclude_list = fetch_remote_list(s3, from_path,
recursive=True, require_attribs=True)
dst_list, dst_exclude_list = fetch_remote_list(s3, destination_base,
recursive=True, require_attribs=True)
src_count = len(src_list)
dst_count = len(dst_list)
LOG.info(u"Found %d source files, %d destination files" %
(src_count, dst_count))
src_list, dst_list, update_list, copy_pairs = compare_filelists(src_list,
dst_list, src_remote=True, dst_remote=True)
src_count = len(src_list)
update_count = len(update_list)
dst_count = len(dst_list)
LOG.info(u"Summary: %d source files to copy, %d files at destination to delete"
% (src_count, dst_count))
# Populate 'target_uri' only if we've got something to sync from src to dst
for key in src_list:
src_list[key]['target_uri'] = destination_base + key
for key in update_list:
update_list[key]['target_uri'] = destination_base + key
def _upload(src_list, seq, src_count):
file_list = src_list.keys()
file_list.sort()
for file in file_list:
seq += 1
item = src_list[file]
src_uri = S3Uri(item['object_uri_str'])
dst_uri = S3Uri(item['target_uri'])
extra_headers = copy(cfg.extra_headers)
try:
_response = s3.object_copy(src_uri, dst_uri, extra_headers)
LOG.info("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
except S3Error, e:
LOG.error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
return seq
# Perform the synchronization of files
timestamp_start = time.time()
seq = 0
seq = _upload(src_list, seq, src_count + update_count)
seq = _upload(update_list, seq, src_count + update_count)
n_copied, bytes_saved, failed_copy_files = remote_copy(s3, copy_pairs, destination_base)
# Process files not copied
debug("Process files that was not remote copied")
failed_copy_count = len (failed_copy_files)
for key in failed_copy_files:
failed_copy_files[key]['target_uri'] = destination_base + key
seq = _upload(failed_copy_files, seq, src_count + update_count + failed_copy_count)
total_elapsed = max(1.0, time.time() - timestamp_start)
outstr = "Done. Copied %d files in %0.1f seconds, %0.2f files/s" % (seq, total_elapsed, seq/total_elapsed)
LOG.info(outstr)
return EX_OK
def remote_copy(s3, copy_pairs, destination_base):
saved_bytes = 0
failed_copy_list = FileDict()
for (src_obj, dst1, dst2) in copy_pairs:
LOG.debug(u"Remote Copying from %s to %s" % (dst1, dst2))
dst1_uri = S3Uri(destination_base + dst1)
dst2_uri = S3Uri(destination_base + dst2)
extra_headers = copy(cfg.extra_headers)
try:
s3.object_copy(dst1_uri, dst2_uri, extra_headers)
info = s3.object_info(dst2_uri)
saved_bytes = saved_bytes + long(info['headers']['content-length'])
LOG.info(u"remote copy: %s -> %s" % (dst1, dst2))
except:
LOG.warning(u'Unable to remote copy files %s -> %s' % (dst1_uri, dst2_uri))
failed_copy_list[dst2] = src_obj
return (len(copy_pairs), saved_bytes, failed_copy_list)
如果s3 密钥具有所有S3 权限,则可以正常工作。但是,我想使用具有一部分权限的 IAM 来调用此函数。这是我当前的组策略:
{
"Statement": [
{
"Sid": "cloneFiles",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:DeleteObject"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::target-bucket/*"
]
}
]
}
根据这项新政策,我收到了以下错误消息:
ERROR:root:S3 error: Access Denied
我想知道:
1) 有没有一种简单的方法可以找出缺少的权限(例如某个参数,env var)? S3 可以报告需要哪些权限吗?如果是这样,我怎样才能知道?
2) 任何人都可以通过阅读代码或其他方式识别缺少的权限吗?
在回答 #2 时,您可能会在 fetch_remote_list() 调用中出错。即使用 s3:ListBucket
。值得注意的是,ListBucket 适用于存储桶资源,而不是存储桶中的对象路径(一个微妙但重要的区别)。所以你需要第二个声明,比如
{
"Action": [
"s3:ListBucket",
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::target-bucket"
]
}
我有 'written' 一个函数,用于将对象从 S3 存储桶上的一个位置复制到同一存储桶中的另一个位置。 (实际上它主要是从 s3cmd 中提取的代码)。
import sys
import time
from copy import copy
import logging
from S3.Exceptions import *
from S3.S3 import S3
from S3.Config import Config
from S3.FileDict import FileDict
from S3.S3Uri import S3Uri
from S3.Utils import *
from S3.FileLists import *
from S3.ExitCodes import EX_OK
LOG = logging.getLogger()
# Unfortunately the s3cmd implementation uses a global instance of config object
# everywhere
cfg = None
def init_s3_cfg(access_key, secret_key):
global cfg
cfg = Config(access_key=access_key, secret_key=secret_key)
def cmd_sync_remote2remote(str_from_path, str_destination_base):
'''
This function is adopted from s3cmd project https://github.com/s3tools/s3cmd
because boto does not support recursive copy out of the box
:param str_from_path:
:param str_destination_base:
:return:
'''
s3 = S3(cfg)
LOG.info(s3.config.bucket_location)
# Normalise s3://uri (e.g. assert trailing slash)
from_path = S3Uri(str_from_path).uri()
destination_base = S3Uri(str_destination_base).uri()
LOG.info("from %s to %s" % (from_path, destination_base))
src_list, src_exclude_list = fetch_remote_list(s3, from_path,
recursive=True, require_attribs=True)
dst_list, dst_exclude_list = fetch_remote_list(s3, destination_base,
recursive=True, require_attribs=True)
src_count = len(src_list)
dst_count = len(dst_list)
LOG.info(u"Found %d source files, %d destination files" %
(src_count, dst_count))
src_list, dst_list, update_list, copy_pairs = compare_filelists(src_list,
dst_list, src_remote=True, dst_remote=True)
src_count = len(src_list)
update_count = len(update_list)
dst_count = len(dst_list)
LOG.info(u"Summary: %d source files to copy, %d files at destination to delete"
% (src_count, dst_count))
# Populate 'target_uri' only if we've got something to sync from src to dst
for key in src_list:
src_list[key]['target_uri'] = destination_base + key
for key in update_list:
update_list[key]['target_uri'] = destination_base + key
def _upload(src_list, seq, src_count):
file_list = src_list.keys()
file_list.sort()
for file in file_list:
seq += 1
item = src_list[file]
src_uri = S3Uri(item['object_uri_str'])
dst_uri = S3Uri(item['target_uri'])
extra_headers = copy(cfg.extra_headers)
try:
_response = s3.object_copy(src_uri, dst_uri, extra_headers)
LOG.info("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
except S3Error, e:
LOG.error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
return seq
# Perform the synchronization of files
timestamp_start = time.time()
seq = 0
seq = _upload(src_list, seq, src_count + update_count)
seq = _upload(update_list, seq, src_count + update_count)
n_copied, bytes_saved, failed_copy_files = remote_copy(s3, copy_pairs, destination_base)
# Process files not copied
debug("Process files that was not remote copied")
failed_copy_count = len (failed_copy_files)
for key in failed_copy_files:
failed_copy_files[key]['target_uri'] = destination_base + key
seq = _upload(failed_copy_files, seq, src_count + update_count + failed_copy_count)
total_elapsed = max(1.0, time.time() - timestamp_start)
outstr = "Done. Copied %d files in %0.1f seconds, %0.2f files/s" % (seq, total_elapsed, seq/total_elapsed)
LOG.info(outstr)
return EX_OK
def remote_copy(s3, copy_pairs, destination_base):
saved_bytes = 0
failed_copy_list = FileDict()
for (src_obj, dst1, dst2) in copy_pairs:
LOG.debug(u"Remote Copying from %s to %s" % (dst1, dst2))
dst1_uri = S3Uri(destination_base + dst1)
dst2_uri = S3Uri(destination_base + dst2)
extra_headers = copy(cfg.extra_headers)
try:
s3.object_copy(dst1_uri, dst2_uri, extra_headers)
info = s3.object_info(dst2_uri)
saved_bytes = saved_bytes + long(info['headers']['content-length'])
LOG.info(u"remote copy: %s -> %s" % (dst1, dst2))
except:
LOG.warning(u'Unable to remote copy files %s -> %s' % (dst1_uri, dst2_uri))
failed_copy_list[dst2] = src_obj
return (len(copy_pairs), saved_bytes, failed_copy_list)
如果s3 密钥具有所有S3 权限,则可以正常工作。但是,我想使用具有一部分权限的 IAM 来调用此函数。这是我当前的组策略:
{
"Statement": [
{
"Sid": "cloneFiles",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:DeleteObject"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::target-bucket/*"
]
}
]
}
根据这项新政策,我收到了以下错误消息:
ERROR:root:S3 error: Access Denied
我想知道:
1) 有没有一种简单的方法可以找出缺少的权限(例如某个参数,env var)? S3 可以报告需要哪些权限吗?如果是这样,我怎样才能知道?
2) 任何人都可以通过阅读代码或其他方式识别缺少的权限吗?
在回答 #2 时,您可能会在 fetch_remote_list() 调用中出错。即使用 s3:ListBucket
。值得注意的是,ListBucket 适用于存储桶资源,而不是存储桶中的对象路径(一个微妙但重要的区别)。所以你需要第二个声明,比如
{
"Action": [
"s3:ListBucket",
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::target-bucket"
]
}