需要有关 CloudFormation 模板和 AWS lambda 的帮助,以便通过 lambda 将事件从 SQS 拉到 S3

Need help on CloudFormation template and AWS lambda for pulling events from SQS to S3 via lambda

我是 AWS CloudFormation 的新手,我正在尝试从 SQS 队列中捕获事件并通过 AWS lambda 将它们放入 S3 存储桶中。事件流是 SNS --> SQS <-- Lambda ---> S3 存储桶.

我正在尝试使用 cloudFormation 实现上述流程 template.I 在部署我的 CloudFormation 模板后收到以下错误消息。您可以提供的任何帮助将不胜感激。谢谢

我的 lambda 函数是:

import json
import logging

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')


def lambda_handler(event, context):
    logger.info(f"lambda_handler -- event: {json.dumps(event)}")

    s3_bucket = boto3.resource("3")

    event_message = json.loads(event["Records"][0]["body"])
    s3_bucket.put_object(Bucket="S3DeployBucket", key="data.json",  Body=json.dumps(event_message))

我完整的 CloudFormation 模板是:

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Description": "myDemoApp Resource Stack",
  "Mappings": {

  },
  "Parameters": {
    "S3DeployBucket": {
      "Default": "myDemoApp-deploy-bucket",
      "Description": "Bucket for deployment configs and artifacts for myDemoApp",
      "Type": "String"
    },
    "EnvName": {
      "Description": "Platform environment name for myDemoApp",
      "Type": "String"
    },
    "AuditRecordKeyArn": {
      "Description": "ARN for audit record key encryption for myDemoApp",
      "Type": "String"
    },
    "ParentVPCStack": {
      "Description": "The name of the stack containing the parent VPC for myDemoApp",
      "Type": "String"
    },
    "StackVersion": {
      "Description": "The version of this stack of myDemoApp",
      "Type": "String"
    },
    "EventLogFolderName": {
      "Type": "String",
      "Description": "folder name for the logs for the event stream of myDemoApp",
      "Default": "event_log_stream"
    },
    "EventLogPartitionKeys": {
      "Type": "String",
      "Description": "The partition keys that audit logs will write to S3. Use Hive-style naming conventions for automatic Athena/Glue comprehension.",
      "Default": "year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}"
    },
    "AppEventSNSTopicArn": {
      "Description": "Events SNS Topic of myDemoApp",
      "Type": "String"
    },
    "ReportingEventsRetentionDays": {
      "Default": "2192",
      "Description": "The number of days to retain a record used for reporting.",
      "Type": "String"
    }
  },
  "Resources": {
    "AppEventSQSQueue": {
      "Type": "AWS::SQS::Queue"
    },

    "AppEventSnsSubscription": {
      "Type": "AWS::SNS::Subscription",
      "Properties": {
        "TopicArn": {
          "Ref": "AppEventSNSTopicArn"
        },
        "Endpoint": {
          "Fn::GetAtt": [
            "AppEventSQSQueue",
            "Arn"
          ]
        },
        "Protocol": "sqs"
      }
    },

    "S3DeployBucket": {
      "Type": "AWS::S3::Bucket",
      "DeletionPolicy": "Retain",
      "UpdateReplacePolicy": "Retain",
      "Properties": {
        "BucketEncryption": {
          "ServerSideEncryptionConfiguration": [
            {
              "ServerSideEncryptionByDefault": {
                "KMSMasterKeyID": {
                  "Ref": "AuditRecordKeyArn"
                },
                "SSEAlgorithm": "aws:kms"
              }
            }
          ]
        },
        "VersioningConfiguration": {
          "Status": "Enabled"
        },
        "LifecycleConfiguration": {
          "Rules": [
            {
              "ExpirationInDays": {
                "Ref": "ReportingEventsRetentionDays"
              },
              "Status": "Enabled"
            }
          ]
        }
      }
    },
    "EventStreamLogGroup": {
      "Type": "AWS::Logs::LogGroup"
    },
    "EventLogStream": {
      "Type": "AWS::Logs::LogStream",
      "Properties": {
        "LogGroupName": {
          "Ref": "EventStreamLogGroup"
        }
      }
    },
    "EventStreamSubscriptionRole": {
      "Type": "AWS::IAM::Role",
      "Properties": {
        "AssumeRolePolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": "sns.amazonaws.com"
              },
              "Action": "sts:AssumeRole"
            }
          ]
        },
        "Policies": [
          {
            "PolicyName": "SNSSQSAccessPolicy",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": {
                "Action": [
                  "sqs:*"
                ],
                "Effect": "Allow",
                "Resource": "*"
              }
            }
          }
        ]
      }
    },
    "EventDeliveryRole": {
      "Type": "AWS::IAM::Role",
      "Properties": {
        "AssumeRolePolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": "sqs.amazonaws.com"
              },
              "Action": "sts:AssumeRole",
              "Condition": {
                "StringEquals": {
                  "sts:ExternalId": {
                    "Ref": "AWS::AccountId"
                  }
                }
              }
            }
          ]
        }
      }
    },
    "EventSqsQueuePolicy": {
      "Type": "AWS::SQS::QueuePolicy",
      "Properties": {
        "PolicyDocument": {
          "Version": "2012-10-17",
          "Id": "SqsQueuePolicy",
          "Statement": [
            {
              "Sid": "Allow-SNS-SendMessage",
              "Effect": "Allow",
              "Principal": "*",
              "Action": [
                "sqs:SendMessage",
                "sqs:ReceiveMessage"
              ],
              "Resource": {
                "Fn::GetAtt": [
                  "EventStreamLambda",
                  "Arn"
                ]
              },
              "Condition": {
                "ArnEquals": {
                  "aws:SourceArn": {
                    "Ref": "EventSNSTopicArn"
                  }
                }
              }
            }
          ]
        },
        "Queues": [
          {
            "Ref": "EventSNSTopicArn"
          }
        ]
      }
    },
    "EventDeliveryPolicy": {
      "Type": "AWS::IAM::Policy",
      "Properties": {
        "PolicyName": "sqs_delivery_policy",
        "PolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "s3:PutObject"
              ],
              "Resource": [
                {
                  "Fn::GetAtt": [
                    "S3DeployBucket",
                    "Arn"
                  ]
                },
                {
                  "Fn::Join": [
                    "",
                    [
                      {
                        "Fn::GetAtt": [
                          "S3DeployBucket",
                          "Arn"
                        ]
                      },
                      "/*"
                    ]
                  ]
                }
              ]
            },
            {
              "Effect": "Allow",
              "Action": [
                "logs:PutLogEvents"
              ],
              "Resource": {
                "Fn::Sub": "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:${EventStreamLogGroup}:log-stream:${EventLogStreamLogStream}"
              }
            },
            {
              "Effect": "Allow",
              "Action": [
                "kms:Decrypt",
                "kms:GenerateDataKey"
              ],
              "Resource": [
                {
                  "Ref": "AuditRecordKeyArn"
                }
              ],
              "Condition": {
                "StringEquals": {
                  "kms:ViaService": {
                    "Fn::Join": [
                      "",
                      [
                        "s3.",
                        {
                          "Ref": "AWS::Region"
                        },
                        ".amazonaws.com"
                      ]
                    ]
                  }
                },
                "StringLike": {
                  "kms:EncryptionContext:aws:s3:arn": {
                    "Fn::Join": [
                      "",
                      [
                        {
                          "Fn::GetAtt": [
                            "S3DeployBucket",
                            "Arn"
                          ]
                        },
                        "/*"
                      ]
                    ]
                  }
                }
              }
            }
          ]
        },
        "Roles": [
          {
            "Ref": "EventDeliveryRole"
          }
        ]
      }
    },
    "EventStreamLambda": {
      "Type": "AWS::Lambda::Function",
      "Properties": {
        "Handler": "lambda_function.lambda_handler",
        "MemorySize": 128,
        "Runtime": "python3.8",
        "Timeout": 30,
        "FunctionName": "sqs_s3_pipeline_job",
        "Role": {
          "Fn::GetAtt": [
            "SQSLambdaExecutionRole",
            "Arn"
          ]
        },
        "Code": {
          "S3Bucket": {
            "Ref": "S3DeployBucket"
          },
          "S3Key": {
            "Ref": "S3DeployBucket"
          }
        },
        "TracingConfig": {
          "Mode": "Active"
        }
      }
    },
    "SQSLambdaExecutionRole": {
      "Type": "AWS::IAM::Role",
      "Properties": {
        "AssumeRolePolicyDocument": {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": [
                  "lambda.amazonaws.com"
                ]
              },
              "Action": [
                "sts:AssumeRole"
              ]
            }
          ]
        },
        "Policies": [
          {
            "PolicyName": "StreamLambdaLogs",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": [
                {
                  "Effect": "Allow",
                  "Action": [
                    "logs:*"
                  ],
                  "Resource": "arn:aws:logs:*:*:*"
                }
              ]
            }
          },
          {
            "PolicyName": "SQSLambdaPolicy",
            "PolicyDocument": {
              "Version": "2012-10-17",
              "Statement": [
                {
                  "Effect": "Allow",
                  "Action": [
                    "sqs:ReceiveMessage",
                    "sqs:DeleteMessage",
                    "sqs:GetQueueAttributes",
                    "sqs:ChangeMessageVisibility"
                  ],
                  "Resource":"*"
                }
              ]
            }
          }
        ]
      }
    }
  },
  "Outputs": {
    "VpcSubnet3ExportKey": {
      "Value": {
        "Fn::Sub": "${ParentVPCStack}-privateSubnet3"
      }
    }
  }
}

SubscriptionRoleArnonly 运动:

This property applies only to Amazon Kinesis Data Firehose delivery stream subscriptions.