为此 S3 函数编写单元测试

Writing unit test for this S3 function

我有一个定义如下的 S3 函数:

def get_unprocessed_s3_object_list(s3_client, bucket: str) -> List[str]:
    paginator = s3_client.get_paginator("list_objects_v2")
    page_iterator = paginator.paginate(Bucket=bucket)

    all_files = {}

    for page in page_iterator:
        if page["KeyCount"] > 0:
            for item in page["Contents"]:
                all_files[item["Key"]] = item["LastModified"]

    return sorted(all_files, key=lambda k: k[1])

现在,我希望为此编写一个单元测试,并尝试使用 botocore 中的 stubber 对象来测试它。我试过类似的东西:

def test_unprocessed_s3_objects():
    client = boto3.client("s3")
    stubber = Stubber(client)

    list_objects_v2_response = {
        "Contents": [
            {
                "Key": "sensor_1",
                "LastModified": "2021-11-30T12:58:14+00:00",
                "ETag": '"3a3c5ca43d2f01dba42314c1ca7e2237"',
                "Size": 1417,
                "StorageClass": "STANDARD",
            },
            {
                "Key": "sensor_2",
                "LastModified": "2021-11-30T12:58:05+00:00",
                "ETag": '"02bc99343bbdcbdefc9fe691b6f7deaa"',
                "Size": 1332,
                "StorageClass": "STANDARD",
            },
        ]
    }

    stubber.add_response("list_objects_v2", list_objects_v2_response)
    items = get_unprocessed_s3_object_list(client, "test")
    assert len(items) == 2

然而,这会返回 Access Denied with ListObjectsV2 ooperation。也许这与存储桶 ID 有关。我认为这会绕过实际调用。

调用stubber.add_response("list_objects_v2", list_objects_v2_response)后,需要调用stubber.activate()才能使用stubber。