需要有关 CloudFormation 模板和 AWS lambda 的帮助,以便通过 lambda 将事件从 SQS 拉到 S3
Need help on CloudFormation template and AWS lambda for pulling events from SQS to S3 via lambda
我是 AWS CloudFormation 的新手,我正在尝试从 SQS 队列中捕获事件并通过 AWS lambda 将它们放入 S3 存储桶中。事件流是
SNS --> SQS <-- Lambda ---> S3 存储桶.
我正在尝试使用 cloudFormation 实现上述流程 template.I 在部署我的 CloudFormation 模板后收到以下错误消息。您可以提供的任何帮助将不胜感激。谢谢
- 11:51:56 2022-01-13 17:51:47,930 - 信息 - ...
- 11:52:53 2022-01-13 17:52:48,511 - 错误 - 堆栈 myDemoApp 显示回滚状态 ROLLBACK_IN_PROGRESS。
- 11:52:53 2022-01-13 17:52:48,674 - 信息 - 在资源 EventStreamLambda 的 myDemoApp 堆栈中发现以下根本原因失败事件:
- 11:52:53 2022-01-13 17:52:48,674 - 信息 - 资源处理程序返回消息:“GetObject 时发生错误。S3 错误代码:NoSuchKey。
S3 错误消息:指定的键不存在。 (服务:Lambda,状态码:400,请求
ID:5f2f9882-a863-4a58-90bd-7e0d0dfdf4d5,扩展请求 ID:null)”(RequestToken:0a95acb4-a677-0a2d-d0bc-8b7487a858ad,HandlerErrorCode:InvalidRequest)
- 11:52:53 2022-01-13 17:52:48,674 - 信息 - ..
我的 lambda 函数是:
import json
import logging
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
def lambda_handler(event, context):
logger.info(f"lambda_handler -- event: {json.dumps(event)}")
s3_bucket = boto3.resource("3")
event_message = json.loads(event["Records"][0]["body"])
s3_bucket.put_object(Bucket="S3DeployBucket", key="data.json", Body=json.dumps(event_message))
我完整的 CloudFormation 模板是:
{
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "myDemoApp Resource Stack",
"Mappings": {
},
"Parameters": {
"S3DeployBucket": {
"Default": "myDemoApp-deploy-bucket",
"Description": "Bucket for deployment configs and artifacts for myDemoApp",
"Type": "String"
},
"EnvName": {
"Description": "Platform environment name for myDemoApp",
"Type": "String"
},
"AuditRecordKeyArn": {
"Description": "ARN for audit record key encryption for myDemoApp",
"Type": "String"
},
"ParentVPCStack": {
"Description": "The name of the stack containing the parent VPC for myDemoApp",
"Type": "String"
},
"StackVersion": {
"Description": "The version of this stack of myDemoApp",
"Type": "String"
},
"EventLogFolderName": {
"Type": "String",
"Description": "folder name for the logs for the event stream of myDemoApp",
"Default": "event_log_stream"
},
"EventLogPartitionKeys": {
"Type": "String",
"Description": "The partition keys that audit logs will write to S3. Use Hive-style naming conventions for automatic Athena/Glue comprehension.",
"Default": "year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}"
},
"AppEventSNSTopicArn": {
"Description": "Events SNS Topic of myDemoApp",
"Type": "String"
},
"ReportingEventsRetentionDays": {
"Default": "2192",
"Description": "The number of days to retain a record used for reporting.",
"Type": "String"
}
},
"Resources": {
"AppEventSQSQueue": {
"Type": "AWS::SQS::Queue"
},
"AppEventSnsSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "AppEventSNSTopicArn"
},
"Endpoint": {
"Fn::GetAtt": [
"AppEventSQSQueue",
"Arn"
]
},
"Protocol": "sqs"
}
},
"S3DeployBucket": {
"Type": "AWS::S3::Bucket",
"DeletionPolicy": "Retain",
"UpdateReplacePolicy": "Retain",
"Properties": {
"BucketEncryption": {
"ServerSideEncryptionConfiguration": [
{
"ServerSideEncryptionByDefault": {
"KMSMasterKeyID": {
"Ref": "AuditRecordKeyArn"
},
"SSEAlgorithm": "aws:kms"
}
}
]
},
"VersioningConfiguration": {
"Status": "Enabled"
},
"LifecycleConfiguration": {
"Rules": [
{
"ExpirationInDays": {
"Ref": "ReportingEventsRetentionDays"
},
"Status": "Enabled"
}
]
}
}
},
"EventStreamLogGroup": {
"Type": "AWS::Logs::LogGroup"
},
"EventLogStream": {
"Type": "AWS::Logs::LogStream",
"Properties": {
"LogGroupName": {
"Ref": "EventStreamLogGroup"
}
}
},
"EventStreamSubscriptionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
"Policies": [
{
"PolicyName": "SNSSQSAccessPolicy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": {
"Action": [
"sqs:*"
],
"Effect": "Allow",
"Resource": "*"
}
}
}
]
}
},
"EventDeliveryRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sqs.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": {
"Ref": "AWS::AccountId"
}
}
}
}
]
}
}
},
"EventSqsQueuePolicy": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Version": "2012-10-17",
"Id": "SqsQueuePolicy",
"Statement": [
{
"Sid": "Allow-SNS-SendMessage",
"Effect": "Allow",
"Principal": "*",
"Action": [
"sqs:SendMessage",
"sqs:ReceiveMessage"
],
"Resource": {
"Fn::GetAtt": [
"EventStreamLambda",
"Arn"
]
},
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "EventSNSTopicArn"
}
}
}
}
]
},
"Queues": [
{
"Ref": "EventSNSTopicArn"
}
]
}
},
"EventDeliveryPolicy": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyName": "sqs_delivery_policy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": [
{
"Fn::GetAtt": [
"S3DeployBucket",
"Arn"
]
},
{
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"S3DeployBucket",
"Arn"
]
},
"/*"
]
]
}
]
},
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents"
],
"Resource": {
"Fn::Sub": "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:${EventStreamLogGroup}:log-stream:${EventLogStreamLogStream}"
}
},
{
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey"
],
"Resource": [
{
"Ref": "AuditRecordKeyArn"
}
],
"Condition": {
"StringEquals": {
"kms:ViaService": {
"Fn::Join": [
"",
[
"s3.",
{
"Ref": "AWS::Region"
},
".amazonaws.com"
]
]
}
},
"StringLike": {
"kms:EncryptionContext:aws:s3:arn": {
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"S3DeployBucket",
"Arn"
]
},
"/*"
]
]
}
}
}
}
]
},
"Roles": [
{
"Ref": "EventDeliveryRole"
}
]
}
},
"EventStreamLambda": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "lambda_function.lambda_handler",
"MemorySize": 128,
"Runtime": "python3.8",
"Timeout": 30,
"FunctionName": "sqs_s3_pipeline_job",
"Role": {
"Fn::GetAtt": [
"SQSLambdaExecutionRole",
"Arn"
]
},
"Code": {
"S3Bucket": {
"Ref": "S3DeployBucket"
},
"S3Key": {
"Ref": "S3DeployBucket"
}
},
"TracingConfig": {
"Mode": "Active"
}
}
},
"SQSLambdaExecutionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
},
"Action": [
"sts:AssumeRole"
]
}
]
},
"Policies": [
{
"PolicyName": "StreamLambdaLogs",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
},
{
"PolicyName": "SQSLambdaPolicy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility"
],
"Resource":"*"
}
]
}
}
]
}
}
},
"Outputs": {
"VpcSubnet3ExportKey": {
"Value": {
"Fn::Sub": "${ParentVPCStack}-privateSubnet3"
}
}
}
}
SubscriptionRoleArn
是 only 运动:
This property applies only to Amazon Kinesis Data Firehose delivery stream subscriptions.
我是 AWS CloudFormation 的新手,我正在尝试从 SQS 队列中捕获事件并通过 AWS lambda 将它们放入 S3 存储桶中。事件流是 SNS --> SQS <-- Lambda ---> S3 存储桶.
我正在尝试使用 cloudFormation 实现上述流程 template.I 在部署我的 CloudFormation 模板后收到以下错误消息。您可以提供的任何帮助将不胜感激。谢谢
- 11:51:56 2022-01-13 17:51:47,930 - 信息 - ...
- 11:52:53 2022-01-13 17:52:48,511 - 错误 - 堆栈 myDemoApp 显示回滚状态 ROLLBACK_IN_PROGRESS。
- 11:52:53 2022-01-13 17:52:48,674 - 信息 - 在资源 EventStreamLambda 的 myDemoApp 堆栈中发现以下根本原因失败事件:
- 11:52:53 2022-01-13 17:52:48,674 - 信息 - 资源处理程序返回消息:“GetObject 时发生错误。S3 错误代码:NoSuchKey。 S3 错误消息:指定的键不存在。 (服务:Lambda,状态码:400,请求 ID:5f2f9882-a863-4a58-90bd-7e0d0dfdf4d5,扩展请求 ID:null)”(RequestToken:0a95acb4-a677-0a2d-d0bc-8b7487a858ad,HandlerErrorCode:InvalidRequest)
- 11:52:53 2022-01-13 17:52:48,674 - 信息 - ..
我的 lambda 函数是:
import json
import logging
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
def lambda_handler(event, context):
logger.info(f"lambda_handler -- event: {json.dumps(event)}")
s3_bucket = boto3.resource("3")
event_message = json.loads(event["Records"][0]["body"])
s3_bucket.put_object(Bucket="S3DeployBucket", key="data.json", Body=json.dumps(event_message))
我完整的 CloudFormation 模板是:
{
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "myDemoApp Resource Stack",
"Mappings": {
},
"Parameters": {
"S3DeployBucket": {
"Default": "myDemoApp-deploy-bucket",
"Description": "Bucket for deployment configs and artifacts for myDemoApp",
"Type": "String"
},
"EnvName": {
"Description": "Platform environment name for myDemoApp",
"Type": "String"
},
"AuditRecordKeyArn": {
"Description": "ARN for audit record key encryption for myDemoApp",
"Type": "String"
},
"ParentVPCStack": {
"Description": "The name of the stack containing the parent VPC for myDemoApp",
"Type": "String"
},
"StackVersion": {
"Description": "The version of this stack of myDemoApp",
"Type": "String"
},
"EventLogFolderName": {
"Type": "String",
"Description": "folder name for the logs for the event stream of myDemoApp",
"Default": "event_log_stream"
},
"EventLogPartitionKeys": {
"Type": "String",
"Description": "The partition keys that audit logs will write to S3. Use Hive-style naming conventions for automatic Athena/Glue comprehension.",
"Default": "year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}"
},
"AppEventSNSTopicArn": {
"Description": "Events SNS Topic of myDemoApp",
"Type": "String"
},
"ReportingEventsRetentionDays": {
"Default": "2192",
"Description": "The number of days to retain a record used for reporting.",
"Type": "String"
}
},
"Resources": {
"AppEventSQSQueue": {
"Type": "AWS::SQS::Queue"
},
"AppEventSnsSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "AppEventSNSTopicArn"
},
"Endpoint": {
"Fn::GetAtt": [
"AppEventSQSQueue",
"Arn"
]
},
"Protocol": "sqs"
}
},
"S3DeployBucket": {
"Type": "AWS::S3::Bucket",
"DeletionPolicy": "Retain",
"UpdateReplacePolicy": "Retain",
"Properties": {
"BucketEncryption": {
"ServerSideEncryptionConfiguration": [
{
"ServerSideEncryptionByDefault": {
"KMSMasterKeyID": {
"Ref": "AuditRecordKeyArn"
},
"SSEAlgorithm": "aws:kms"
}
}
]
},
"VersioningConfiguration": {
"Status": "Enabled"
},
"LifecycleConfiguration": {
"Rules": [
{
"ExpirationInDays": {
"Ref": "ReportingEventsRetentionDays"
},
"Status": "Enabled"
}
]
}
}
},
"EventStreamLogGroup": {
"Type": "AWS::Logs::LogGroup"
},
"EventLogStream": {
"Type": "AWS::Logs::LogStream",
"Properties": {
"LogGroupName": {
"Ref": "EventStreamLogGroup"
}
}
},
"EventStreamSubscriptionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
"Policies": [
{
"PolicyName": "SNSSQSAccessPolicy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": {
"Action": [
"sqs:*"
],
"Effect": "Allow",
"Resource": "*"
}
}
}
]
}
},
"EventDeliveryRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sqs.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": {
"Ref": "AWS::AccountId"
}
}
}
}
]
}
}
},
"EventSqsQueuePolicy": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Version": "2012-10-17",
"Id": "SqsQueuePolicy",
"Statement": [
{
"Sid": "Allow-SNS-SendMessage",
"Effect": "Allow",
"Principal": "*",
"Action": [
"sqs:SendMessage",
"sqs:ReceiveMessage"
],
"Resource": {
"Fn::GetAtt": [
"EventStreamLambda",
"Arn"
]
},
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "EventSNSTopicArn"
}
}
}
}
]
},
"Queues": [
{
"Ref": "EventSNSTopicArn"
}
]
}
},
"EventDeliveryPolicy": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyName": "sqs_delivery_policy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": [
{
"Fn::GetAtt": [
"S3DeployBucket",
"Arn"
]
},
{
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"S3DeployBucket",
"Arn"
]
},
"/*"
]
]
}
]
},
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents"
],
"Resource": {
"Fn::Sub": "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:${EventStreamLogGroup}:log-stream:${EventLogStreamLogStream}"
}
},
{
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey"
],
"Resource": [
{
"Ref": "AuditRecordKeyArn"
}
],
"Condition": {
"StringEquals": {
"kms:ViaService": {
"Fn::Join": [
"",
[
"s3.",
{
"Ref": "AWS::Region"
},
".amazonaws.com"
]
]
}
},
"StringLike": {
"kms:EncryptionContext:aws:s3:arn": {
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"S3DeployBucket",
"Arn"
]
},
"/*"
]
]
}
}
}
}
]
},
"Roles": [
{
"Ref": "EventDeliveryRole"
}
]
}
},
"EventStreamLambda": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Handler": "lambda_function.lambda_handler",
"MemorySize": 128,
"Runtime": "python3.8",
"Timeout": 30,
"FunctionName": "sqs_s3_pipeline_job",
"Role": {
"Fn::GetAtt": [
"SQSLambdaExecutionRole",
"Arn"
]
},
"Code": {
"S3Bucket": {
"Ref": "S3DeployBucket"
},
"S3Key": {
"Ref": "S3DeployBucket"
}
},
"TracingConfig": {
"Mode": "Active"
}
}
},
"SQSLambdaExecutionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
},
"Action": [
"sts:AssumeRole"
]
}
]
},
"Policies": [
{
"PolicyName": "StreamLambdaLogs",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
},
{
"PolicyName": "SQSLambdaPolicy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility"
],
"Resource":"*"
}
]
}
}
]
}
}
},
"Outputs": {
"VpcSubnet3ExportKey": {
"Value": {
"Fn::Sub": "${ParentVPCStack}-privateSubnet3"
}
}
}
}
SubscriptionRoleArn
是 only 运动:
This property applies only to Amazon Kinesis Data Firehose delivery stream subscriptions.