使用 AWS CDK 时 Lambda 函数无法实例化

Lambda function fails to instantiate when using AWS CDK

我正在尝试使用 AWS CDK 部署将由 S3 上传事件触发的 Lambda 函数。当我尝试执行 cdk lscdk synth 时,出现错误:

Traceback (most recent call last):
  File "app.py", line 14, in <module>
    S3TosqsStack(app, "S3TosqsStack", env=core.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'), region=os.getenv('CDK_DEFAULT_REGION')))
  File "/home/ec2-user/s3tosqs/.venv/lib64/python3.7/site-packages/jsii/_runtime.py", line 83, in __call__
    inst = super().__call__(*args, **kwargs)
  File "/home/ec2-user/s3tosqs/s3tosqs/s3tosqs_stack.py", line 37, in __init__
    bucket=s3.IBucket(bucket_name=lambda_deployment_bucket),
  File "/home/ec2-user/s3tosqs/.venv/lib64/python3.7/site-packages/typing_extensions.py", line 1548, in _no_init
    raise TypeError('Protocols cannot be instantiated')
TypeError: Protocols cannot be instantiated
Subprocess exited with error 1

看起来问题出在 lambda 函数的 bucket 定义上,但我不明白错误是什么,因为我遵循了文档。我尝试使用 bucket_arn 而不是 bucket_name,但这也不起作用。

这里是主要的堆栈代码:

s3tosqs_stack.py

from aws_cdk import (
    aws_s3 as s3,
    aws_s3_notifications as s3_notifications,
    aws_sqs as sqs,
    aws_lambda as _lambda
)

from aws_cdk import core

# User-specified Parameters
lambda_deployment_bucket = 'some-deployment-bucket'
trigger_bucket = 'some-trigger-bucket'
trigger_key = 'uploads'
queue_name = 'some-queue.fifo'
region = 'us-west-2'

class S3TosqsStack(core.Stack):

    def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)


        # Defines an SQS queue resource
        queue = sqs.Queue(
            self, 'NotificationQueue',
            queue_name=queue_name,
            content_based_deduplication=True,
            visibility_timeout=core.Duration.seconds(300)
        )

        # Defines an AWS Lambda resource
        lambda_function = _lambda.Function(
            self, 'S3toSQS',
            runtime=_lambda.Runtime.PYTHON_3_8,
            code=_lambda.Code.from_bucket(
                bucket=s3.IBucket(bucket_name=lambda_deployment_bucket),
                key='S3toSQS.zip'),
            handler='handler.publish_SQS_message',
            environment={'SOURCE_BUCKET': trigger_bucket,
                        'REGION': region,
                        'QUEUE_NAME': queue_name}
        )

        # Define S3 upload bucket for Lambda trigger
        upload_bucket = s3.Bucket(
            self, 'S3TriggerBucket',
            bucket_name=trigger_bucket
        )

        upload_bucket.add_event_notification(
            s3.EventType.OBJECT_CREATED,
            s3_notifications.LambdaDestination(lambda_function),
            s3.NotificationKeyFilter(
                prefix=trigger_key)
        )

您是否尝试过使用 Bucket 和 from_bucket_name 使用 bucket 引用?

        lambda_function = _lambda.Function(
        self, 'S3toSQS',
        runtime=_lambda.Runtime.PYTHON_3_8,
        code=_lambda.Code.from_bucket(
            bucket=s3.Bucket.from_bucket_name(self, "id",bucket_name=lambda_deployment_bucket),
            key='S3toSQS.zip'),
        handler='handler.publish_SQS_message',
        environment={'SOURCE_BUCKET': trigger_bucket,
                     'REGION': region,
                     'QUEUE_NAME': queue.queue_name}
    )

您遇到的问题可能有两个原因。

  1. 修改您的代码,如下所示。修改在第 4 行,以调用“from_bucket”方法的参数的方式。
lambda_function = _lambda.Function(
            self, 'S3toSQS',
            runtime=_lambda.Runtime.PYTHON_3_8,
            code=_lambda.Code.from_bucket(s3.IBucket(bucket_name=lambda_deployment_bucket),'S3toSQS.zip'),
            handler='handler.publish_SQS_message',
            environment={'SOURCE_BUCKET': trigger_bucket,
                        'REGION': region,
                        'QUEUE_NAME': queue_name}
        )
  1. 您正在使用 Python 3.7 到 运行 您的 CDK 代码。据我所知,该版本的 Python 不支持协议类型,因此出现错误。尝试升级到 Python 3.8