在 Lambda 中重新处理 DLQ 事件
Re-process DLQ events in Lambda
我有一个配置了 SQS DeadLetterQueue 的 AWS Lambda 函数 'A'。当 Lambda 无法处理事件时,会将其正确发送到 DLQ。有没有办法重新处理以 DLQ 结束的事件?
我找到了两个解决方案,但它们都有缺点:
- 创建一个新的 Lambda 函数 'B',它从 SQS 读取事件,然后将事件一一发送到之前的 Lambda 'A'。 -> 在这里我必须编写新代码并部署一个新函数
- 在事件到达 SQS 时再次触发 Lambda 'A' -> 这看起来很危险,因为我可能会导致循环执行
我理想的解决方案应该是使用 Lambda 'A' 按需重新处理丢弃的事件,而不是从头开始创建新的 Lambda 'B'。有办法实现吗?
最后,我没有从 AWS 找到任何解决方案来重新处理 Lambda 函数的 DLQ 事件。然后我创建了自己的自定义 Lambda 函数(我希望这对遇到相同问题的其他开发人员有所帮助):
import boto3
lamb = boto3.client('lambda')
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName='my_dlq_name')
def lambda_handler(event, context):
for _ in range(100):
messages_to_delete = []
for message in queue.receive_messages(MaxNumberOfMessages=10):
payload_bytes_array = bytes(message.body, encoding='utf8')
# print(payload_bytes_array)
lamb.invoke(
FunctionName='my_lambda_name',
InvocationType="Event", # Event = Invoke the function asynchronously.
Payload=payload_bytes_array
)
# Add message to delete
messages_to_delete.append({
'Id': message.message_id,
'ReceiptHandle': message.receipt_handle
})
# If you don't receive any notifications the messages_to_delete list will be empty
if len(messages_to_delete) == 0:
break
# Delete messages to remove them from SQS queue handle any errors
else:
deleted = queue.delete_messages(Entries=messages_to_delete)
print(deleted)
部分代码灵感来自this post
我有一个配置了 SQS DeadLetterQueue 的 AWS Lambda 函数 'A'。当 Lambda 无法处理事件时,会将其正确发送到 DLQ。有没有办法重新处理以 DLQ 结束的事件?
我找到了两个解决方案,但它们都有缺点:
- 创建一个新的 Lambda 函数 'B',它从 SQS 读取事件,然后将事件一一发送到之前的 Lambda 'A'。 -> 在这里我必须编写新代码并部署一个新函数
- 在事件到达 SQS 时再次触发 Lambda 'A' -> 这看起来很危险,因为我可能会导致循环执行
我理想的解决方案应该是使用 Lambda 'A' 按需重新处理丢弃的事件,而不是从头开始创建新的 Lambda 'B'。有办法实现吗?
最后,我没有从 AWS 找到任何解决方案来重新处理 Lambda 函数的 DLQ 事件。然后我创建了自己的自定义 Lambda 函数(我希望这对遇到相同问题的其他开发人员有所帮助):
import boto3
lamb = boto3.client('lambda')
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName='my_dlq_name')
def lambda_handler(event, context):
for _ in range(100):
messages_to_delete = []
for message in queue.receive_messages(MaxNumberOfMessages=10):
payload_bytes_array = bytes(message.body, encoding='utf8')
# print(payload_bytes_array)
lamb.invoke(
FunctionName='my_lambda_name',
InvocationType="Event", # Event = Invoke the function asynchronously.
Payload=payload_bytes_array
)
# Add message to delete
messages_to_delete.append({
'Id': message.message_id,
'ReceiptHandle': message.receipt_handle
})
# If you don't receive any notifications the messages_to_delete list will be empty
if len(messages_to_delete) == 0:
break
# Delete messages to remove them from SQS queue handle any errors
else:
deleted = queue.delete_messages(Entries=messages_to_delete)
print(deleted)
部分代码灵感来自this post