DynamoDB Stream 总是发送相同的事件?

DynamoDB Stream sending always the same event?

我正在使用 dynamodb 流来触发 lambda 函数。

我的serverless.yml文件是这样的:

functions:
  main:
    handler: app.main.handler
    events:
      - http:
          method: any
          path: /{proxy+}
      # to keep lambda function warm
      - schedule:
          rate: rate(5 minutes)
          input:
            warmer: true
      # triggered when a new insertion is made in the dynamodb table
      - stream:
          type: dynamodb
          arn:
            Fn::GetAtt: [AsyncTaskTable, StreamArn]

resources:
  Resources:
    AsyncTaskTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        TableName: ${self:custom.AsyncTaskTableName}
        AttributeDefinitions:
          -
            AttributeName: "uuid"
            AttributeType: "S"
        KeySchema:
          -
            AttributeName: "uuid"
            KeyType: "HASH"
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TimeToLiveSpecification:
            AttributeName: "deletion_date_time"
            Enabled: true
        StreamSpecification:
          StreamViewType: NEW_IMAGE

我的处理程序是这样的:

def handler(event, context):
    print(event)

    if event.get('warmer'):
        pass

    elif event.get('Records'):
        print('process async')

    # On convertit au bon format de dictionnaire
    event = json_util.loads(event)

    for record in event['Records']:
        if record['eventName'] == 'INSERT':
            python_module = record['dynamodb']['NewImage']['python_module']
            python_function = record['dynamodb']['NewImage']['python_function']
            uuid = record['dynamodb']['NewImage']['uuid']
            params = record['dynamodb']['NewImage']['params']
            getattr(sys.modules[python_module], python_function)(uuid, params)

    else:
        print('else')

一切都与 dynamodb table 和处理程序完美配合,但由于我不明白的原因,我的处理程序接收到的事件在流中发生时总是相同的!

如果是计划事件或 http 事件,那么我会得到正确的事件并且始终对应于发送的数据,但是当它被 dynamodb 流触发时,它总是相同的事件!

我已经苦苦挣扎了 3 个小时,试图弄明白,但无法理解,我在我的 dynamodb table 中创建了一个新记录,这与之前的记录无关,我仍会收到具有同一事件数据的事件。

我已经删除了 dynamodb 中的所有项目 table 但它仍然是一样的,我收到一个我不知道它来自哪里的事件。总是一样的。

例如下面的打印(事件)我总是在我的发电机数据库 table(inserting/deleting)上做任何事情,我为我的创建了一个新的 table staging/production 环境并且两者的行为方式相同 在日志中,您可以看到日志的日期与 eventSourceARN 不匹配,这对应于数据库中项目的第一次创建,即第一次调用 dynamodb 流。我也很确定我的处理程序也在做其他失败的事情。这可能是原因吗?也就是说,只要我的进程无法正常工作,它就会重播流?

2020-06-09T20:45:06.963+02:00
{'Records': [{'eventID': 'b607c13dc12e16d6602890fb7ab6f418', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'eu-west-3', 'dynamodb': {'ApproximateCreationDateTime': 1591726134.0, 'Keys': {'uuid': {'S': '1234'}}, 'NewImage': {'uuid': {'S': '1234'}}, 'SequenceNumber': '100000000002840891918', 'SizeBytes': 16, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:eu-west-3:213248478927:table/async_task-production/stream/2020-06-09T17:58:45.451'}]}

这是正常的。如果 DynamoDB Streams 触发了您的 Lambda 函数并且您的 Lambda 函数失败,那么 Lambda 调用将使用相同的数据重试,直到事件源成功或过期。

Lambda 现在支持 additional failure-handling features,但在典型情况下,您只需修复 Lambda 函数中的错误,这样它就不会失败,因此不会重试。