如何执行通过 Step 函数将对象从一个 S3 存储桶复制到另一个存储桶的 lambda 函数?

How to execute a lambda function which copies objects from one S3 bucket to another via Step functions?

我能够使用 lambda 函数执行将数据从源存储桶复制到目标存储桶的任务,但是,我在 Step 函数中执行 lambda 函数时出错。以下是我从头开始遵循的步骤。

  1. 选择的区域是 ap-south-1
  2. 创建了 2 个存储桶。源存储桶:start.bucket & 目标存储桶:final.bucket
  3. 使用以下信息创建了一个 Lambda 函数:
    • 从零开始创作
    • 函数名称:CopyCopy
    • 运行时间:Python 3.8
    • 已创建 lambda IAM 角色:LambdaCopy 并提供必要的策略(S3 完全访问权限和 Step 函数完全访问权限)并将其附加到函数。
    • 添加了触发器并选择了:
      • S3
      • 桶:start.bucket
      • 事件类型:所有对象创建事件
    • 我在GeeksforGeeks中找到了一段python代码,应用到代码部分
import json
import boto3
s3_client=boto3.client('s3')

# lambda function to copy file from 1 s3 to another s3
def lambda_handler(event, context):
    #specify source bucket
    source_bucket_name=event['Records'][0]['s3']['bucket']['name']
    #get object that has been uploaded
    file_name=event['Records'][0]['s3']['object']['key']
    #specify destination bucket
    destination_bucket_name='final.bucket'
    #specify from where file needs to be copied
    copy_object={'Bucket':source_bucket_name,'Key':file_name}
    #write copy statement 
    s3_client.copy_object(CopySource=copy_object,Bucket=destination_bucket_name,Key=file_name)

    return {
        'statusCode': 3000,
        'body': json.dumps('File has been Successfully Copied')
    }
- I deployed the code and it worked. Uploaded a csv file in start.bucket and it was copied to final.bucket.

然后,我使用以下信息在 Step 函数中创建了一个状态机:

  1. 直观地设计您的工作流程
  2. 类型:标准
  3. 在开始和结束状态之间拖动 AWS Lambda。
    • 更名为 LambdaCopy
    • 集成类型:优化
    • 在API参数下,函数名称(我选择了我创建的Lambda函数):CopyCopy:$LATEST
    • 下一个状态:结束
  4. 下一个然后再下一个
  5. 状态机名称:StepLambdaCopy
    • IAM 角色:创建一个新角色(后来赋予它 S3 完全访问权限、Lambdafullaccess 和 Step 函数完全访问权限)。

当我尝试执行它时显示错误。 我知道我错过了什么。非常感谢您的帮助。

Step 函数现在允许您直接使用 S3 Copy SDK,完全绕过对 Lambda 和 boto3 的需求。查看here了解更多信息。

因此,在您的情况下,您需要一个如下所示的简单任务:

{
  "Comment": "A description of my state machine",
  "StartAt": "CopyObject",
  "States": {
    "CopyObject": {
      "Type": "Task",
      "End": true,
      "Parameters": {
        "ServerSideEncryption": "AES256",
        "Bucket.$": "$.destination_bucket",
        "CopySource.$": "$.source_path",
        "Key.$": "$.key"
      },
      "Resource": "arn:aws:states:::aws-sdk:s3:copyObject"
    }
  }
}

然后您的输入状态将需要输入您通常使用复制命令复制文件的参数。 Source Path, Destination Bucket, Object Key 和boto3命令完全一样。

注意:您的状态机 IAM 角色将需要直接 S3 权限,并且需要与存储桶位于同一区域。

总是混淆您必须作为参数传递的确切内容。这是我用来复制 Athena 查询输出的模板。您可以根据自己的需要进行调整:

"athena_score": {
  "Type": "Task",
  "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
  "Parameters": {
    "QueryExecutionContext": {
      "Catalog": "${AthenaCatalog}",
      "Database": "${AthenaDatabase}"
    },
    "QueryString": "SELECT ...",
    "WorkGroup": "${AthenaWorkGroup}",
    "ResultConfiguration": {
      "OutputLocation": "s3://${BucketName}/${OutputPath}"
    }
  },
  "TimeoutSeconds": 300,
  "ResultPath": "$.responseBody",
  "Next": "copy_csv"
},
"copy_csv": {
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:s3:copyObject",
  "Parameters": {
    "Bucket": "${BucketName}",
    "CopySource.$": "States.Format('/${BucketName}/${OutputPath}/{}.csv', $.responseBody.QueryExecution.QueryExecutionId)",
    "Key": "${OutputPath}/latest.csv"
  },
  "ResultPath": "$.responseBody.CopyObject",
  "Ent": "true"
}