DynamoDB Stream 总是发送相同的事件?
DynamoDB Stream sending always the same event?
我正在使用 dynamodb 流来触发 lambda 函数。
我的serverless.yml文件是这样的:
functions:
main:
handler: app.main.handler
events:
- http:
method: any
path: /{proxy+}
# to keep lambda function warm
- schedule:
rate: rate(5 minutes)
input:
warmer: true
# triggered when a new insertion is made in the dynamodb table
- stream:
type: dynamodb
arn:
Fn::GetAtt: [AsyncTaskTable, StreamArn]
resources:
Resources:
AsyncTaskTable:
Type: 'AWS::DynamoDB::Table'
Properties:
TableName: ${self:custom.AsyncTaskTableName}
AttributeDefinitions:
-
AttributeName: "uuid"
AttributeType: "S"
KeySchema:
-
AttributeName: "uuid"
KeyType: "HASH"
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TimeToLiveSpecification:
AttributeName: "deletion_date_time"
Enabled: true
StreamSpecification:
StreamViewType: NEW_IMAGE
我的处理程序是这样的:
def handler(event, context):
print(event)
if event.get('warmer'):
pass
elif event.get('Records'):
print('process async')
# On convertit au bon format de dictionnaire
event = json_util.loads(event)
for record in event['Records']:
if record['eventName'] == 'INSERT':
python_module = record['dynamodb']['NewImage']['python_module']
python_function = record['dynamodb']['NewImage']['python_function']
uuid = record['dynamodb']['NewImage']['uuid']
params = record['dynamodb']['NewImage']['params']
getattr(sys.modules[python_module], python_function)(uuid, params)
else:
print('else')
一切都与 dynamodb table 和处理程序完美配合,但由于我不明白的原因,我的处理程序接收到的事件在流中发生时总是相同的!
如果是计划事件或 http 事件,那么我会得到正确的事件并且始终对应于发送的数据,但是当它被 dynamodb 流触发时,它总是相同的事件!
我已经苦苦挣扎了 3 个小时,试图弄明白,但无法理解,我在我的 dynamodb table 中创建了一个新记录,这与之前的记录无关,我仍会收到具有同一事件数据的事件。
我已经删除了 dynamodb 中的所有项目 table 但它仍然是一样的,我收到一个我不知道它来自哪里的事件。总是一样的。
例如下面的打印(事件)我总是在我的发电机数据库 table(inserting/deleting)上做任何事情,我为我的创建了一个新的 table staging/production 环境并且两者的行为方式相同
在日志中,您可以看到日志的日期与 eventSourceARN 不匹配,这对应于数据库中项目的第一次创建,即第一次调用 dynamodb 流。我也很确定我的处理程序也在做其他失败的事情。这可能是原因吗?也就是说,只要我的进程无法正常工作,它就会重播流?
2020-06-09T20:45:06.963+02:00
{'Records': [{'eventID': 'b607c13dc12e16d6602890fb7ab6f418', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'eu-west-3', 'dynamodb': {'ApproximateCreationDateTime': 1591726134.0, 'Keys': {'uuid': {'S': '1234'}}, 'NewImage': {'uuid': {'S': '1234'}}, 'SequenceNumber': '100000000002840891918', 'SizeBytes': 16, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:eu-west-3:213248478927:table/async_task-production/stream/2020-06-09T17:58:45.451'}]}
这是正常的。如果 DynamoDB Streams 触发了您的 Lambda 函数并且您的 Lambda 函数失败,那么 Lambda 调用将使用相同的数据重试,直到事件源成功或过期。
Lambda 现在支持 additional failure-handling features,但在典型情况下,您只需修复 Lambda 函数中的错误,这样它就不会失败,因此不会重试。
我正在使用 dynamodb 流来触发 lambda 函数。
我的serverless.yml文件是这样的:
functions:
main:
handler: app.main.handler
events:
- http:
method: any
path: /{proxy+}
# to keep lambda function warm
- schedule:
rate: rate(5 minutes)
input:
warmer: true
# triggered when a new insertion is made in the dynamodb table
- stream:
type: dynamodb
arn:
Fn::GetAtt: [AsyncTaskTable, StreamArn]
resources:
Resources:
AsyncTaskTable:
Type: 'AWS::DynamoDB::Table'
Properties:
TableName: ${self:custom.AsyncTaskTableName}
AttributeDefinitions:
-
AttributeName: "uuid"
AttributeType: "S"
KeySchema:
-
AttributeName: "uuid"
KeyType: "HASH"
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TimeToLiveSpecification:
AttributeName: "deletion_date_time"
Enabled: true
StreamSpecification:
StreamViewType: NEW_IMAGE
我的处理程序是这样的:
def handler(event, context):
print(event)
if event.get('warmer'):
pass
elif event.get('Records'):
print('process async')
# On convertit au bon format de dictionnaire
event = json_util.loads(event)
for record in event['Records']:
if record['eventName'] == 'INSERT':
python_module = record['dynamodb']['NewImage']['python_module']
python_function = record['dynamodb']['NewImage']['python_function']
uuid = record['dynamodb']['NewImage']['uuid']
params = record['dynamodb']['NewImage']['params']
getattr(sys.modules[python_module], python_function)(uuid, params)
else:
print('else')
一切都与 dynamodb table 和处理程序完美配合,但由于我不明白的原因,我的处理程序接收到的事件在流中发生时总是相同的!
如果是计划事件或 http 事件,那么我会得到正确的事件并且始终对应于发送的数据,但是当它被 dynamodb 流触发时,它总是相同的事件!
我已经苦苦挣扎了 3 个小时,试图弄明白,但无法理解,我在我的 dynamodb table 中创建了一个新记录,这与之前的记录无关,我仍会收到具有同一事件数据的事件。
我已经删除了 dynamodb 中的所有项目 table 但它仍然是一样的,我收到一个我不知道它来自哪里的事件。总是一样的。
例如下面的打印(事件)我总是在我的发电机数据库 table(inserting/deleting)上做任何事情,我为我的创建了一个新的 table staging/production 环境并且两者的行为方式相同 在日志中,您可以看到日志的日期与 eventSourceARN 不匹配,这对应于数据库中项目的第一次创建,即第一次调用 dynamodb 流。我也很确定我的处理程序也在做其他失败的事情。这可能是原因吗?也就是说,只要我的进程无法正常工作,它就会重播流?
2020-06-09T20:45:06.963+02:00
{'Records': [{'eventID': 'b607c13dc12e16d6602890fb7ab6f418', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'eu-west-3', 'dynamodb': {'ApproximateCreationDateTime': 1591726134.0, 'Keys': {'uuid': {'S': '1234'}}, 'NewImage': {'uuid': {'S': '1234'}}, 'SequenceNumber': '100000000002840891918', 'SizeBytes': 16, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:eu-west-3:213248478927:table/async_task-production/stream/2020-06-09T17:58:45.451'}]}
这是正常的。如果 DynamoDB Streams 触发了您的 Lambda 函数并且您的 Lambda 函数失败,那么 Lambda 调用将使用相同的数据重试,直到事件源成功或过期。
Lambda 现在支持 additional failure-handling features,但在典型情况下,您只需修复 Lambda 函数中的错误,这样它就不会失败,因此不会重试。