AWS Lambda 无法从 SQS 读取

AWS Lambda fails to read from SQS

当 SQS 队列中有消息但没有针对 logger.info(message.body) 的 CloudWatch 日志条目时,以下 lambda 函数成功执行。最后一个 CloudWatch 日志条目是针对 logger.info(msg_queue.url) 的,它正确地打印了队列 URL。但在那之后什么都没有。它甚至不启动 stepfunction。从我的本地计算机执行时,此函数运行良好。 Lambda 执行者角色具有对 SQS 和 StepFunction 的完全访问权限。我在这里错过了什么? TIA

参考链接: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.start_execution

import boto3
import hashlib
import logging
import uuid


logger = logging.getLogger("Queue_Listener")
logger.setLevel(logging.INFO)


ssm_client = boto3.client('ssm')
stepfn_client = boto3.client('stepfunctions')
sqs_client = boto3.resource('sqs')


def lambda_handler(event, context):

    step_func = ssm_client.get_parameter(Name='/stepfunctions/test-step-function')
    step_func_arn = step_func['Parameter']['Value']
    logger.info(step_func_arn)

    msg_queue = sqs_client.get_queue_by_name(QueueName='test-queue.fifo')
    logger.info(msg_queue.url)
    
    for message in msg_queue.receive_messages(MessageAttributeNames=['All'],
                                                          ReceiveRequestAttemptId=str(uuid.uuid4()),
                                                          WaitTimeSeconds=20):
        logger.info(message.body)
        # checking for message body corruption
        msg_hash = hashlib.md5(message.body.encode())
        digest = msg_hash.hexdigest()

        if digest != message.md5_of_body:
            logger.error("Message body is corrupted")

        if digest == message.md5_of_body:
            exec_name = "stepfn-" + str(uuid.uuid4())
            response = stepfn_client.start_execution(stateMachineArn=step_func_arn,
                                                     name=exec_name,
                                                     input=message.body,
                                                     traceHeader=exec_name
                                                     )
            # Let the queue know that the message is processed
            message.delete()
            logger.info(response)

            return {"execution_status":response}


起初我很困惑,如果 test-queue.fifo 是触发 lambda 函数的原因,您不会直接处理 event 而不是在函数逻辑中轮询队列。不过仔细想想,我认为这种模式 你的问题。也就是说,这是正在发生的事情:

  1. 一条消息到达 SQS 队列。
  2. 消息已发送到您的 lambda 并触发其执行。它在队列中不再可见,因为:

When Lambda reads a batch, the messages stay in the queue but are hidden for the length of the queue's visibility timeout. If your function successfully processes the batch, Lambda deletes the messages from the queue. (AWS docs source)

  1. lambda 轮询队列。那里什么都没有(可见)。 lambda returns 成功。
  2. 由于 lambda 成功完成,事件已从队列中删除。

您应该在 Lambda 和 SQS 之间使用已经为您实现的连接。上面的 link 概述了如何执行此操作。您可以选择一次(最多)向您的 lambda 发送多少条消息,您会将它们传递给该函数,并自动从队列中删除。

如果您真的真的很想使用您的手动方法,您需要更改您的 lambda 触发器(想到的想法是 Cloudwatch 指标触发器对未决消息的数量),或者更改消息可见性超时 and 添加逻辑以在处理后从队列中删除事件 and 确保您的轮询 + 处理 + 删除逻辑与 Lambda 的逻辑配合得很好成功时自动删除。我不推荐后一种方法:)