如何有效管理消息可见性超时?

How to manage message visibility timeout effectively?

根据事件的不同,我们的流程需要 30 秒到 5 分钟不等。我们的基本架构如下所示(python 3.9,boto3 到 send_message 到 sqs):

VPC和NAT网关配置完成。

我们发现后续消息总是以 10 分钟为间隔进行处理,尽管函数调用早先已成功完成。 我们打算更有效地管理时间,并尝试将消息可见性超时默认缩短为 1 分钟,并在函数 B 执行期间每 30 秒延长 1 分钟(我们使用 boto3 client.change_message_visibility)。它似乎不起作用 - 几乎所有消息都在我们的死信队列中结束(CloudWatch 日志没有提示任何错误)。

我们的代码:

  1. 基本代码(在我们尝试优化整个过程的时间之前;至少它的工作方式是成功处理所有消息;但是它花费了太多时间,因为消息以 10 分钟为间隔处理即使有些处理在例如 2 分钟内完成): Template.yaml 关于 SQS 和 lambda 函数的摘录 B:
Globals:
  Function:
    Timeout: 30
    Environment:
      Variables:
        SQS_QUEUE_URL: !Ref SqsQueue
Resources:
  SqsQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 600
      RedrivePolicy:
        deadLetterTargetArn:
          Fn::GetAtt:
            - DeadLetterQueue
            - Arn
        maxReceiveCount: 10
  DeadLetterQueue:
    Type: AWS::SQS::Queue
  LambdaFunctionBRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: receiveAndDeleteFromQueue
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: [
                  sqs:ReceiveMessage,
                  sqs:ChangeMessageVisibility,
                  sqs:DeleteMessage,
                  sqs:GetQueueAttributes
                ]
                Resource: !GetAtt SqsQueue.Arn
        - PolicyName: accessVpc
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:  [
                  logs:CreateLogGroup,
                  logs:CreateLogStream,
                  logs:PutLogEvents,
                  ec2:CreateNetworkInterface,
                  ec2:DescribeNetworkInterfaces,
                  ec2:DeleteNetworkInterface,
                  ec2:AssignPrivateIpAddresses,
                  ec2:UnassignPrivateIpAddresses
                ]
                Resource: "*"
  LambdaFunctionB:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/function_b/
      Handler: app.lambda_handler
      Runtime: python3.9
      Role: !GetAtt LambdaFunctionBExecutionRole.Arn 
      Architectures:
        - x86_64
      VpcConfig:
        SecurityGroupIds:
          - "xxx"
        SubnetIds:
          - "xxx"
          - "xxx"
          - "xxx"
      FileSystemConfigs:
        - Arn: !GetAtt AccessPoint.Arn
          LocalMountPath: /mnt/lambda
      ReservedConcurrentExecutions: 1
      Events:
        SqsEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt SqsQueue.Arn
            BatchSize: 1
            Enabled: True
      Timeout: 600
      MemorySize: 512 

Lambda 函数 B 代码app.py:

import os
import boto3
<some other imports>

sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")

def lambda_handler(event, context):
    status = "ok"

    record = event["Records"][0]
    message = json.loads(record["body"])
    receipt_handle = record["receiptHandle"]
    
    <some business logic here>
    
    return status
  1. 在我们决定尝试优化时间效率后,我们为 SqsQueue 更改了 VisibilityTimeout:60,并为 lambda 函数 B 更改了 app.py 文件,如下所示:
import os
import boto3
from threading import Timer
from time import sleep
<some other imports>

sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")

class RepeatingTimer(Timer):
    def run(self):
        while not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
            self.finished.wait(self.interval)
def increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, timeout):
        sqs_client.change_message_visibility(
            QueueUrl=sqs_queue_url,
            ReceiptHandle=receipt_handle,
            VisibilityTimeout=timeout
        )

def lambda_handler(event, context):
    status = "ok"

    record = event["Records"][0]
    message = json.loads(record["body"])
    receipt_handle = record["receiptHandle"]
    
    t = RepeatingTimer(30, increase_visibility_timeout, [sqs_client, sqs_queue_url, receipt_handle, 60])
    t.start() 

    <some business logic here>
    
    increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, 5)
    t.cancel()

    return status

处理来自 Amazon SQS 队列的消息的正常过程是:

  • 进程向 SQS 队列发送消息
  • 由于您有一个在 SQS 队列上带有触发器的 AWS Lambda 函数,并且它配置了 批量大小 1,因此只会将一条消息传递给 Lambda函数
  • 此外,由于 Lambda 函数配置了 预留并发 1,因此不会有并行处理——所有消息都将由 Lambda 的一个实例处理函数
  • 当消息传递给 Lambda 函数时,隐形超时开始
  • 当Lambda函数成功退出后,Lambda服务会从SQS队列中删除消息
  • 但是,如果在 之前 Lambda 函数完成处理消息,则消息将重新出现在队列中(期望消息无法处理)。这将导致将来再次处理该消息。
  • 也可以向 Amazon SQS 发送一个 heartbeat 来告诉它消息仍在处理中并且 延长隐身超时

一般来说,隐身超时应该设置得足够高,以确保它仅在处理出现问题时触发。例如,如果处理一条消息通常需要 1 分钟,您可以将隐身期设置为 4 分钟。这允许处理可能需要更长时间的异常情况,而不会在实际成功处理消息时意外地重新处理消息。

你的情况

您提到 “lambda 会等待 SQS 队列超时过去以开始处理下一条消息”。这表明您的过程中发生了一些奇怪的事情。通常,当 Lambda 函数无错退出时,Lambda 服务将使用下一条消息再次调用 Lambda 函数。在消息不可见期和执行 Lambda 函数之间没有直接的 link——也就是说,根据您的配置,当达到消息超时时,没有任何东西会 'kill' 现有的 Lambda 函数,也没有任何东西会延迟下一次 Lambda 调用,直到发生消息超时。但是,当达到 Lambda 函数超时时,函数 变为 'killed'。因此,听起来您的 Lambda 函数在完成处理消息 之前是 timing-out。您应该将 Lambda 函数的超时时间增加到比预期 运行 持续时间大几倍。 (这是一个独立于 SQS 队列隐身期的配置。)