AWS Fifo 在 Lambda 节流时停止

AWS Fifo Halts on Lambda Throttle

我们用一个Fifo Queue configured with a Lambda Function as a processor. We use MessageGoupId and BatchSize to optimistically remove redundant messages. To rate limit processing we use reserved concurrency. Our function timeout has to be high. The queue maximum receives设置为十。

观察到

当队列中有大量消息时,Lambda 函数会扩展。一旦它扩展到足以节流,队列处理就会完全停止,直到几分钟后才会处理进一步的消息。

我假设这是因为限制,因为停止总是与限制同时发生,并且当增加预留并发时,处理停止需要更长的时间。

我假设队列再次启动的时间与 lambda retry limit, the function timeout and the queue visibility timeout 有关。但由于我不知道到底发生了什么,所以这是一个猜测。

问题

没有记录任何错误,最终所有消息都得到处理,但由于处理对时间和吞吐量敏感,因此让队列暂停几分钟是不可接受的。

问题

这是怎么回事,我们该如何解决?如果需要更多信息,我很乐意进一步调试。


编辑: 发现:To allow your function time to process each batch of records, set the source queue's visibility timeout to at least 6 times the timeout that you configure on your function. The extra time allows for Lambda to retry if your function execution is throttled while your function is processing a previous batch. 我们肯定违反了,但我不确定这如何/是否解释了观察到的行为。

如何复制

在这个答案的末尾有一个完整的、最小的示例,可以轻松重现该问题。

要部署,请创建所有文件并将您的 aws profile 和所需的 region 填写到所有 sh 文件中。

然后运行

. deploy-stack.sh

创建包含所有必要资源的 cloudformation 堆栈。

然后打开AWS网络界面(SQS)和运行

. generate-messages.sh

在队列中生成消息。

然后可以看到大约一半的消息在函数节流和队列完全停止之前得到处理。

在所有调试完成后删除 cloudformation 堆栈 运行 remove-stack.sh

解决方案

AWS 文档contains a note 说法

To allow your function time to process each batch of records, set the source queue's visibility timeout to at least 6 times the timeout that you configure on your function. The extra time allows for Lambda to retry if your function execution is throttled while your function is processing a previous batch.

将 lambda 函数上的 timeout600 更改为 100 并重新部署堆栈允许所有消息正确处理,即使 lambda 函数节流。

我无法解释为什么会出现这种行为,非常感谢对此的反馈。然而,以上确实解决了所描述的问题。

文件

stack.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Debug Stack for Fifo with Lambda Processor
Resources:
  MyLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName:
        Fn::Sub: lambda-role
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Action:
              - sts:AssumeRole
            Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AWSLambdaExecute
        - arn:aws:iam::aws:policy/AmazonSqsFullAccess
      Path: /
  MySqsQueue:
      Type: 'AWS::SQS::Queue'
      Properties:
        FifoQueue: true
        VisibilityTimeout: 600
  MySQSQueueFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.handler
      Role: !GetAtt MyLambdaRole.Arn
      Runtime: nodejs12.x
      Timeout: 600
      ReservedConcurrentExecutions: 5
      Code:
        ZipFile: |
          exports.handler = (event, context) => new Promise((resolve) => {
            setTimeout(resolve, 1000);
          });
  MySQSLambdaEventSource:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: false
      EventSourceArn: !GetAtt MySqsQueue.Arn
      FunctionName: !Ref MySQSQueueFunction
Outputs:
  QueueUrl:
    Value:
      Ref: MySqsQueue
  EventSource:
    Value:
      Ref: MySQSLambdaEventSource

deploy-stack.sh

#!/bin/bash

profile=local
region=us-east-1

# -----------------

aws cloudformation deploy \
--profile $profile \
--region $region \
--template-file stack.yaml \
--stack-name fifo-lambda-debug \
--capabilities CAPABILITY_NAMED_IAM

generate-messages.sh

#!/bin/bash

profile=local
region=us-east-1

# -----------------

function genGroupId {
  echo $(shuf -i 1-10 -n 1)
}
function genRndStr {
  echo $(openssl rand -hex 12)
}
function entry {
  echo "{\"Id\":\"$(genRndStr)\",\"MessageBody\":\"$(genRndStr)\",\"MessageGroupId\":\"$(genGroupId)\",\"MessageDeduplicationId\":\"$(genRndStr)\"}"
}

# -----------------

echo "Getting Subscription UUID..."
eventSource=$(aws cloudformation describe-stacks \
--query "Stacks[0].Outputs[?OutputKey=='EventSource'].OutputValue" \
--output text \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug)

echo "Getting Queue Url..."
queueUrl=$(aws cloudformation describe-stacks \
--query "Stacks[0].Outputs[?OutputKey=='QueueUrl'].OutputValue" \
--output text \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug)

echo "Disabling Subscription"
aws lambda update-event-source-mapping \
--profile $profile \
--region $region \
--uuid $eventSource \
--no-enabled \
> /dev/null

while : ; do
    echo "Waiting until Subscription disabled..."
    [[ $(aws lambda get-event-source-mapping \
      --profile $profile \
      --region $region \
      --uuid $eventSource \
      --query "State") != '"Disabled"' ]] || break
    sleep 10
done

echo "Queueing Messages..."
for i in {1..30}
do
  aws sqs send-message-batch \
  --profile $profile \
  --region $region \
  --queue-url "$queueUrl" \
  --entries "[$(entry),$(entry),$(entry),$(entry),$(entry),$(entry),$(entry),$(entry),$(entry),$(entry)]" \
  > /dev/null
  echo "Done: $i / 30"
done

echo "Re-Enabling Subscription..."
aws lambda update-event-source-mapping \
--profile $profile \
--region $region \
--uuid $eventSource \
--enabled \
> /dev/null

remove-stack.sh

#!/bin/bash

profile=local
region=us-east-1

# -----------------

aws cloudformation delete-stack \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug