如何仅在将特定文件(密钥)写入 S3 存储桶时触发 AWS 事件规则

How to trigger AWS Events Rule only when a specific file (key) gets written to an S3 Bucket

我正在尝试创建一个 AWS 事件(在 CloudWatch 或 EventBridge 中),当将特定文件放入 S3 存储桶时触发 AWS Step Function 的 运行。我的规则事件模式如下所示:

{
  "source": [
    "aws.s3"
  ],
  "detail-type": [
    "AWS API Call via CloudTrail"
  ],
  "detail": {
    "eventSource": [
      "s3.amazonaws.com"
    ],
    "eventName": [
      "PutObject"
    ],
    "requestParameters": {
      "bucketName": [
        "bucketname"
      ],
      "key": [
        "date={{TODAYS DATE}}/_SUCCESS"
      ]
    }
  }
}

我希望 key 元素指向一个路径,其中 TODAYS DATE 表示当前日期,_SUCCCESS 是我的作业打印到目录的空文件一旦成功完成(例如,如果今天是 2019 年 10 月 31 日,要检查的完整存储桶路径将是 bucketname/date=20191031/_SUCCESS)。最终目标是让事件规则触发一个 Step Function,它控制许多其他日常作业,一旦第一个将 _SUCCESS 文件输出到存储桶的作业成功完成,这些日常作业只能 运行。

我最好使用当天的当前日期对 _SUCCESS 文件进行密钥检查。但是,如果没有好的方法来处理日期,如果有一种方法可以在将新目录放入存储桶时触发一次规则(例如,当目录 date=XXXXXX 被建造)。我只是不能在每次将任何新文件放入存储桶时都激活触发器,因为初始作业将在 date=XXXXXX 目录中创建许多输出文件,这些文件用作以下作业的输入。

能够通过 AWS CloudFormation 创建此规则也将非常有帮助,因此,如果 CloudFormation 有任何方法来处理这些问题,那就太好了。

提前感谢您的帮助,非常感谢。

我不确定我是否理解您在这里想要实现的目标,但您为什么不将 lambda 函数订阅到存储您的文件的存储桶(订阅它放置事件),做任何类型的检查您想在该 lambda 函数内部以编程方式执行,如果满足所有条件,只需从 lambda 函数中调用上述步骤函数即可。

如果不满足任何条件,则不要启动步骤功能。

以下是您如何将 lambda 函数订阅到 S3 put 事件(通过 Web 控制台)。

  1. 转到S3
  2. 选择你的桶
  3. 转到Properties tab
  4. select Events
  5. 检查PUT事件
  6. Send to下,选择Lambda Function
  7. 选择现有的 lambda 函数(您需要创建该 lambda 函数)

如何从 lambda 函数中访问事件的存储桶名称、对象键和时间戳等属性。 (使用 Python)

def handler_name(event, context): 
    // get bucket name
    print(event['Records'][0]['s3']['bucket']['name'])

    // get object key
    print(event['Records'][0]['s3']['object']['key'])

    // get event timestamp
    print(event['Records'][0]['eventTime'])

    return 0

这里是完整的event对象(即S3事件对象)供参考。

{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-2",
      "eventTime": "2019-09-03T19:37:27.192Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AWS:AIDAINPONIXQXHT3IKHL2"
      },
      "requestParameters": {
        "sourceIPAddress": "205.255.255.255"
      },
      "responseElements": {
        "x-amz-request-id": "D82B88E5F771F645",
        "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo="
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
        "bucket": {
          "name": "lambda-artifacts-deafc19498e3f2df",
          "ownerIdentity": {
            "principalId": "A3I5XTEXAMAI3E"
          },
          "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
        },
        "object": {
          "key": "b21b84d653bb07b05b1e6b33684dc11b",
          "size": 1305107,
          "eTag": "b21b84d653bb07b05b1e6b33684dc11b",
          "sequencer": "0C0F6F405D6ED209E1"
        }
      }
    }

  ]
}

如何从 Lambda 函数中执行 Step Function(使用 Python + Boto3)

import boto3

sfn_client = boto3.client('stepfunctions')

def handler_name(event, context): 

    response = sfn_client.start_execution(
        stateMachineArn='string',
        name='string',
        input='string'
    )

    return 0

其中 stateMachineArn 是要执行的状态机的 Amazon 资源名称 (ARN),name(可选)是执行的名称,input 是包含的字符串JSON 执行的输入数据。