如何找到 运行 此功能所需的所有必要 IAM 权限?

How to find out all the necessary IAM permission I need to run this function?

我有 'written' 一个函数,用于将对象从 S3 存储桶上的一个位置复制到同一存储桶中的另一个位置。 (实际上它主要是从 s3cmd 中提取的代码)。

import sys
import time
from copy import copy
import logging
from S3.Exceptions import *
from S3.S3 import S3
from S3.Config import Config
from S3.FileDict import FileDict
from S3.S3Uri import S3Uri
from S3.Utils import *
from S3.FileLists import *
from S3.ExitCodes import EX_OK


LOG = logging.getLogger()


# Unfortunately the s3cmd implementation uses a global instance of config object
# everywhere
cfg = None


def init_s3_cfg(access_key, secret_key):
    global cfg
    cfg = Config(access_key=access_key, secret_key=secret_key)

def cmd_sync_remote2remote(str_from_path, str_destination_base):
    '''
    This function is adopted from s3cmd project https://github.com/s3tools/s3cmd
     because boto does not support recursive copy out of the box
    :param str_from_path:
    :param str_destination_base:
    :return:
    '''
    s3 = S3(cfg)
    LOG.info(s3.config.bucket_location)

    # Normalise s3://uri (e.g. assert trailing slash)
    from_path = S3Uri(str_from_path).uri()
    destination_base = S3Uri(str_destination_base).uri()

    LOG.info("from %s to %s" % (from_path, destination_base))

    src_list, src_exclude_list = fetch_remote_list(s3, from_path,
                                    recursive=True, require_attribs=True)
    dst_list, dst_exclude_list = fetch_remote_list(s3, destination_base,
                                    recursive=True, require_attribs=True)

    src_count = len(src_list)
    dst_count = len(dst_list)

    LOG.info(u"Found %d source files, %d destination files" %
             (src_count, dst_count))

    src_list, dst_list, update_list, copy_pairs = compare_filelists(src_list,
                                dst_list, src_remote=True, dst_remote=True)

    src_count = len(src_list)
    update_count = len(update_list)
    dst_count = len(dst_list)

    LOG.info(u"Summary: %d source files to copy, %d files at destination to delete"
             % (src_count, dst_count))

    # Populate 'target_uri' only if we've got something to sync from src to dst
    for key in src_list:
        src_list[key]['target_uri'] = destination_base + key
    for key in update_list:
        update_list[key]['target_uri'] = destination_base + key

    def _upload(src_list, seq, src_count):
        file_list = src_list.keys()
        file_list.sort()
        for file in file_list:
            seq += 1
            item = src_list[file]
            src_uri = S3Uri(item['object_uri_str'])
            dst_uri = S3Uri(item['target_uri'])
            extra_headers = copy(cfg.extra_headers)
            try:
                _response = s3.object_copy(src_uri, dst_uri, extra_headers)
                LOG.info("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
            except S3Error, e:
                LOG.error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
        return seq

    # Perform the synchronization of files
    timestamp_start = time.time()
    seq = 0
    seq = _upload(src_list, seq, src_count + update_count)
    seq = _upload(update_list, seq, src_count + update_count)
    n_copied, bytes_saved, failed_copy_files = remote_copy(s3, copy_pairs, destination_base)

    # Process files not copied
    debug("Process files that was not remote copied")
    failed_copy_count = len (failed_copy_files)
    for key in failed_copy_files:
        failed_copy_files[key]['target_uri'] = destination_base + key
    seq = _upload(failed_copy_files, seq, src_count + update_count + failed_copy_count)

    total_elapsed = max(1.0, time.time() - timestamp_start)
    outstr = "Done. Copied %d files in %0.1f seconds, %0.2f files/s" % (seq, total_elapsed, seq/total_elapsed)
    LOG.info(outstr)

    return EX_OK

def remote_copy(s3, copy_pairs, destination_base):
    saved_bytes = 0
    failed_copy_list = FileDict()
    for (src_obj, dst1, dst2) in copy_pairs:
        LOG.debug(u"Remote Copying from %s to %s" % (dst1, dst2))
        dst1_uri = S3Uri(destination_base + dst1)
        dst2_uri = S3Uri(destination_base + dst2)
        extra_headers = copy(cfg.extra_headers)
        try:
            s3.object_copy(dst1_uri, dst2_uri, extra_headers)
            info = s3.object_info(dst2_uri)
            saved_bytes = saved_bytes + long(info['headers']['content-length'])
            LOG.info(u"remote copy: %s -> %s" % (dst1, dst2))
        except:
            LOG.warning(u'Unable to remote copy files %s -> %s' % (dst1_uri, dst2_uri))
            failed_copy_list[dst2] = src_obj
    return (len(copy_pairs), saved_bytes, failed_copy_list)

如果s3 密钥具有所有S3 权限,则可以正常工作。但是,我想使用具有一部分权限的 IAM 来调用此函数。这是我当前的组策略:

{ 
    "Statement": [
        {
            "Sid": "cloneFiles",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:PutObjectAcl",
                "s3:DeleteObject"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::target-bucket/*"
            ]
        }
    ]
}

根据这项新政策,我收到了以下错误消息:

ERROR:root:S3 error: Access Denied

我想知道:

1) 有没有一种简单的方法可以找出缺少的权限(例如某个参数,env var)? S3 可以报告需要哪些权限吗?如果是这样,我怎样才能知道?

2) 任何人都可以通过阅读代码或其他方式识别缺少的权限吗?

在回答 #2 时,您可能会在 fetch_remote_list() 调用中出错。即使用 s3:ListBucket。值得注意的是,ListBucket 适用于存储桶资源,而不是存储桶中的对象路径(一个微妙但重要的区别)。所以你需要第二个声明,比如

        {
        "Action": [
            "s3:ListBucket",
        ],
        "Effect": "Allow",
        "Resource": [
            "arn:aws:s3:::target-bucket"
        ]
    }