无服务器 SQS 消费者跳过消息

Serverless SQS consumer skips messages

我正在使用无服务器框架来使用来自 SQS 的消息。一些发送到队列的消息不会被消费。他们直接进入飞行中的 SQS 状态,然后从那里进入我的死信队列。当我查看我的消费者日志时,我可以看到它消费并成功处理了 9/10 消息。一个总是不被消费,最终进入死信队列。我将 reservedConcurrency 设置为 1,这样一次只有一个消费者可以 运行。函数消费者 timeout 设置为 30 秒。这是消费者代码:

module.exports.mySQSConsumer = async (event, context) => {
  context.callbackWaitsForEmptyEventLoop = false;

  console.log(event.Records);

  await new Promise((res, rej) => {
    setTimeout(() => {
      res();
    }, 100);
  });

  console.log('DONE');

  return true;
}

消费者函数配置如下:

functions:
  mySQSConsumer:
    handler: handler.mySQSConsumer
    timeout: 30 # seconds
    reservedConcurrency: 1
    events:
      - sqs:
          arn: arn:aws:sqs:us-east-1:xyz:my-test-queue
          batchSize: 1
          enabled: true

如果我删除 await 函数,它将处理所有消息。如果我将超时增加到 200 毫秒,更多消息将直接进入飞行状态并从那里进入死信队列。这段代码非常简单。知道为什么它会跳过某些消息吗?未使用的消息甚至不会使用第一个 console.log() 语句显示在日志中。他们似乎完全被忽略了。

我想通了。 SQS 队列 Lambda 函数事件触发的工作方式与我想象的不同。消息被推送到 Lambda 函数中,而不是被它拉取。我认为这可以由 AWS 设计得更好,但事实就是如此。

问题是 Default Visibility Timeout 设置为 30 秒,同时 Reserved Concurrency 设置为 1。当 SQS 队列很快被数千条记录填满时,AWS 开始将消息推送到 Lambda以比单个功能实例可以处理它们的速度更快的速度运行。 AWS "assumes" 它可以简单地启动更多 Lambda 实例来跟上背压。但是,并发限制不允许它启动更多实例 - Lambda 函数受到限制。因此,该函数开始将某些消息的失败返回到 AWS 后端,从而将失败的消息隐藏 30 秒(默认设置),并在这段时间后将它们放回队列中以进行重新处理。由于单个实例要处理的记录太多,30 秒后,Lambda 函数仍然很忙,无法再次处理这些消息。所以情况会重演,消息会返回隐身状态 30 秒。这总共重复 3 次。第三次尝试后,消息进入死信队列(我们以这种方式配置了 SQS 队列)。

为解决此问题,我们将 Default Visibility Timeout 增加到 5 分钟。这足以让 Lambda 函数处理队列中的大部分消息,而失败的消息则在不可见的情况下等待。 5 分钟后,它们被推回队列,由于 Lambda 函数不再繁忙,它将处理其中的大部分。其中一些必须进入隐形状态两次才能成功处理。

因此,解决此问题的方法是像我们一样增加 Default Invisibility Timeout 或增加消息进入死信队列之前所需的失败次数。

我希望这对某人有所帮助。