使用 AWS CDK 时 Lambda 函数无法实例化
Lambda function fails to instantiate when using AWS CDK
我正在尝试使用 AWS CDK 部署将由 S3 上传事件触发的 Lambda 函数。当我尝试执行 cdk ls
或 cdk synth
时,出现错误:
Traceback (most recent call last):
File "app.py", line 14, in <module>
S3TosqsStack(app, "S3TosqsStack", env=core.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'), region=os.getenv('CDK_DEFAULT_REGION')))
File "/home/ec2-user/s3tosqs/.venv/lib64/python3.7/site-packages/jsii/_runtime.py", line 83, in __call__
inst = super().__call__(*args, **kwargs)
File "/home/ec2-user/s3tosqs/s3tosqs/s3tosqs_stack.py", line 37, in __init__
bucket=s3.IBucket(bucket_name=lambda_deployment_bucket),
File "/home/ec2-user/s3tosqs/.venv/lib64/python3.7/site-packages/typing_extensions.py", line 1548, in _no_init
raise TypeError('Protocols cannot be instantiated')
TypeError: Protocols cannot be instantiated
Subprocess exited with error 1
看起来问题出在 lambda 函数的 bucket
定义上,但我不明白错误是什么,因为我遵循了文档。我尝试使用 bucket_arn
而不是 bucket_name
,但这也不起作用。
这里是主要的堆栈代码:
s3tosqs_stack.py
from aws_cdk import (
aws_s3 as s3,
aws_s3_notifications as s3_notifications,
aws_sqs as sqs,
aws_lambda as _lambda
)
from aws_cdk import core
# User-specified Parameters
lambda_deployment_bucket = 'some-deployment-bucket'
trigger_bucket = 'some-trigger-bucket'
trigger_key = 'uploads'
queue_name = 'some-queue.fifo'
region = 'us-west-2'
class S3TosqsStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Defines an SQS queue resource
queue = sqs.Queue(
self, 'NotificationQueue',
queue_name=queue_name,
content_based_deduplication=True,
visibility_timeout=core.Duration.seconds(300)
)
# Defines an AWS Lambda resource
lambda_function = _lambda.Function(
self, 'S3toSQS',
runtime=_lambda.Runtime.PYTHON_3_8,
code=_lambda.Code.from_bucket(
bucket=s3.IBucket(bucket_name=lambda_deployment_bucket),
key='S3toSQS.zip'),
handler='handler.publish_SQS_message',
environment={'SOURCE_BUCKET': trigger_bucket,
'REGION': region,
'QUEUE_NAME': queue_name}
)
# Define S3 upload bucket for Lambda trigger
upload_bucket = s3.Bucket(
self, 'S3TriggerBucket',
bucket_name=trigger_bucket
)
upload_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3_notifications.LambdaDestination(lambda_function),
s3.NotificationKeyFilter(
prefix=trigger_key)
)
您是否尝试过使用 Bucket 和 from_bucket_name 使用 bucket 引用?
lambda_function = _lambda.Function(
self, 'S3toSQS',
runtime=_lambda.Runtime.PYTHON_3_8,
code=_lambda.Code.from_bucket(
bucket=s3.Bucket.from_bucket_name(self, "id",bucket_name=lambda_deployment_bucket),
key='S3toSQS.zip'),
handler='handler.publish_SQS_message',
environment={'SOURCE_BUCKET': trigger_bucket,
'REGION': region,
'QUEUE_NAME': queue.queue_name}
)
您遇到的问题可能有两个原因。
- 修改您的代码,如下所示。修改在第 4 行,以调用“from_bucket”方法的参数的方式。
lambda_function = _lambda.Function(
self, 'S3toSQS',
runtime=_lambda.Runtime.PYTHON_3_8,
code=_lambda.Code.from_bucket(s3.IBucket(bucket_name=lambda_deployment_bucket),'S3toSQS.zip'),
handler='handler.publish_SQS_message',
environment={'SOURCE_BUCKET': trigger_bucket,
'REGION': region,
'QUEUE_NAME': queue_name}
)
- 您正在使用 Python 3.7 到 运行 您的 CDK 代码。据我所知,该版本的 Python 不支持协议类型,因此出现错误。尝试升级到 Python 3.8
我正在尝试使用 AWS CDK 部署将由 S3 上传事件触发的 Lambda 函数。当我尝试执行 cdk ls
或 cdk synth
时,出现错误:
Traceback (most recent call last):
File "app.py", line 14, in <module>
S3TosqsStack(app, "S3TosqsStack", env=core.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'), region=os.getenv('CDK_DEFAULT_REGION')))
File "/home/ec2-user/s3tosqs/.venv/lib64/python3.7/site-packages/jsii/_runtime.py", line 83, in __call__
inst = super().__call__(*args, **kwargs)
File "/home/ec2-user/s3tosqs/s3tosqs/s3tosqs_stack.py", line 37, in __init__
bucket=s3.IBucket(bucket_name=lambda_deployment_bucket),
File "/home/ec2-user/s3tosqs/.venv/lib64/python3.7/site-packages/typing_extensions.py", line 1548, in _no_init
raise TypeError('Protocols cannot be instantiated')
TypeError: Protocols cannot be instantiated
Subprocess exited with error 1
看起来问题出在 lambda 函数的 bucket
定义上,但我不明白错误是什么,因为我遵循了文档。我尝试使用 bucket_arn
而不是 bucket_name
,但这也不起作用。
这里是主要的堆栈代码:
s3tosqs_stack.py
from aws_cdk import (
aws_s3 as s3,
aws_s3_notifications as s3_notifications,
aws_sqs as sqs,
aws_lambda as _lambda
)
from aws_cdk import core
# User-specified Parameters
lambda_deployment_bucket = 'some-deployment-bucket'
trigger_bucket = 'some-trigger-bucket'
trigger_key = 'uploads'
queue_name = 'some-queue.fifo'
region = 'us-west-2'
class S3TosqsStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Defines an SQS queue resource
queue = sqs.Queue(
self, 'NotificationQueue',
queue_name=queue_name,
content_based_deduplication=True,
visibility_timeout=core.Duration.seconds(300)
)
# Defines an AWS Lambda resource
lambda_function = _lambda.Function(
self, 'S3toSQS',
runtime=_lambda.Runtime.PYTHON_3_8,
code=_lambda.Code.from_bucket(
bucket=s3.IBucket(bucket_name=lambda_deployment_bucket),
key='S3toSQS.zip'),
handler='handler.publish_SQS_message',
environment={'SOURCE_BUCKET': trigger_bucket,
'REGION': region,
'QUEUE_NAME': queue_name}
)
# Define S3 upload bucket for Lambda trigger
upload_bucket = s3.Bucket(
self, 'S3TriggerBucket',
bucket_name=trigger_bucket
)
upload_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3_notifications.LambdaDestination(lambda_function),
s3.NotificationKeyFilter(
prefix=trigger_key)
)
您是否尝试过使用 Bucket 和 from_bucket_name 使用 bucket 引用?
lambda_function = _lambda.Function(
self, 'S3toSQS',
runtime=_lambda.Runtime.PYTHON_3_8,
code=_lambda.Code.from_bucket(
bucket=s3.Bucket.from_bucket_name(self, "id",bucket_name=lambda_deployment_bucket),
key='S3toSQS.zip'),
handler='handler.publish_SQS_message',
environment={'SOURCE_BUCKET': trigger_bucket,
'REGION': region,
'QUEUE_NAME': queue.queue_name}
)
您遇到的问题可能有两个原因。
- 修改您的代码,如下所示。修改在第 4 行,以调用“from_bucket”方法的参数的方式。
lambda_function = _lambda.Function(
self, 'S3toSQS',
runtime=_lambda.Runtime.PYTHON_3_8,
code=_lambda.Code.from_bucket(s3.IBucket(bucket_name=lambda_deployment_bucket),'S3toSQS.zip'),
handler='handler.publish_SQS_message',
environment={'SOURCE_BUCKET': trigger_bucket,
'REGION': region,
'QUEUE_NAME': queue_name}
)
- 您正在使用 Python 3.7 到 运行 您的 CDK 代码。据我所知,该版本的 Python 不支持协议类型,因此出现错误。尝试升级到 Python 3.8