如何在 SQS 上延迟重试 4 小时?

How to delay retry by 4 hours on SQS?

TL;DR: 如何模仿 rabbitMQ's scheduling functionality 留住消费者:

  1. 无国籍
  2. 无需管理预定消息
  3. 在接收消息和最终在正确的计划时间使用消息之间,避免了计划消息的无用重试

我有一个在创建时具有默认属性的 SQS 队列。消费者处理一条消息的平均时间为1~2s。但是一些消息需要处理两次,在 4h window 之间。这些消息称为B,其他消息称为A。

假设我的队列中包含以下消息:A1, A2, B1, A3, B2(5 条消息,最多 10 秒以消耗所有消息)在这些 table 的开头:

time     | what should happen
---------|-------------------
now      | consumer connected to queue
now+10s  | all As were consumed successfully and deleted from queue
           Bs had their unsuccessful first try and now they are waiting for their retry in 4h
between  | nothing happens since no new messages arrived and old ones are waiting
now+4h4s | Bs successfully consumed during second retry and due that, deleted from queue

我有一个 Spring 应用程序,当我发现类型为 B 的消息时,我可以抛出异常。由于简单性和可扩展性,我希望有一个单线程消费消息,花费 1~2 秒来消费每条消息。

这样,如果可能的话,我不能挂起消息处理. I also don't need SQS' Delivery delay since it postpones just the messages arriving at queue and not retries. If possible, I would like to keep using long polling @JmsListener and avoid at all keeping any state on my memory's application. I want to avoid this

您可以将消息 B 发送到 Step Functions 状态机并进入等待状态以等待 4 小时,然后再将其发送到队列。状态机会为你保持状态,你可以send messages directly to SQS from Step Functions所以你不需要写任何代码。

我会编写一个小的 AWS Lambda 函数,每~分钟调用一次。该函数将获得一条消息(从希望的 FIFO 类型的 SQS 队列中取出)并检查它被添加的时间。如果添加 >= 4 小时,它会将其从传入队列中删除,并将其添加到延迟 4 小时的队列中,您的应用程序可以收听该队列。如果它移动了一条消息,请继续这样做,直到下一条消息不超过 4 小时。 Increase/decrease lambda 的频率将 'tight' 的粒度增加到 4 小时,但代价是 运行 lambda 更频繁。

这里是一个使用 SQS 的 AWS Lambda 函数的快速 link 示例:https://docs.aws.amazon.com/lambda/latest/dg/with-sqs-example.html

由于我将 JmsListenersetSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE) 一起使用,我决定 运行 在可重新处理消息的消费者结束时这样做:

myAmazonSqsInstance.sendMessage(
        new SendMessageRequest()
                .withQueueUrl("queueName")
                .withMessageBody(myMessageWithText)
                .withDelaySeconds(900) // 900s = 15min
);

这样,该消息将被成功使用,但将在队列中生成一条具有相同正文的新消息。这条消息将在15分钟后被消费,由于我的业务逻辑,再次失败。将有 16 次失败 (16*15min=4h) 直到它最终被消耗而不产生新消息。

虽然这不是我要的,而且和其他答案类似(只是技术栈不同),但我决定写在这里,以提供java解决方案