如何在 SQS 上延迟重试 4 小时?
How to delay retry by 4 hours on SQS?
TL;DR: 如何模仿 rabbitMQ's scheduling functionality 留住消费者:
- 无国籍
- 无需管理预定消息
- 在接收消息和最终在正确的计划时间使用消息之间,避免了计划消息的无用重试
我有一个在创建时具有默认属性的 SQS 队列。消费者处理一条消息的平均时间为1~2s。但是一些消息需要处理两次,在 4h window 之间。这些消息称为B,其他消息称为A。
假设我的队列中包含以下消息:A1, A2, B1, A3, B2
(5 条消息,最多 10 秒以消耗所有消息)在这些 table 的开头:
time | what should happen
---------|-------------------
now | consumer connected to queue
now+10s | all As were consumed successfully and deleted from queue
Bs had their unsuccessful first try and now they are waiting for their retry in 4h
between | nothing happens since no new messages arrived and old ones are waiting
now+4h4s | Bs successfully consumed during second retry and due that, deleted from queue
我有一个 Spring
应用程序,当我发现类型为 B
的消息时,我可以抛出异常。由于简单性和可扩展性,我希望有一个单线程消费消息,花费 1~2 秒来消费每条消息。
这样,如果可能的话,我不能挂起消息处理. I also don't need SQS' Delivery delay
since it postpones just the messages arriving at queue and not retries. If possible, I would like to keep using long polling @JmsListener
and avoid at all keeping any state on my memory's application. I want to avoid this
您可以将消息 B 发送到 Step Functions 状态机并进入等待状态以等待 4 小时,然后再将其发送到队列。状态机会为你保持状态,你可以send messages directly to SQS from Step Functions所以你不需要写任何代码。
我会编写一个小的 AWS Lambda 函数,每~分钟调用一次。该函数将获得一条消息(从希望的 FIFO 类型的 SQS 队列中取出)并检查它被添加的时间。如果添加 >= 4 小时,它会将其从传入队列中删除,并将其添加到延迟 4 小时的队列中,您的应用程序可以收听该队列。如果它移动了一条消息,请继续这样做,直到下一条消息不超过 4 小时。 Increase/decrease lambda 的频率将 'tight' 的粒度增加到 4 小时,但代价是 运行 lambda 更频繁。
这里是一个使用 SQS 的 AWS Lambda 函数的快速 link 示例:https://docs.aws.amazon.com/lambda/latest/dg/with-sqs-example.html
由于我将 JmsListener
与 setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
一起使用,我决定 运行 在可重新处理消息的消费者结束时这样做:
myAmazonSqsInstance.sendMessage(
new SendMessageRequest()
.withQueueUrl("queueName")
.withMessageBody(myMessageWithText)
.withDelaySeconds(900) // 900s = 15min
);
这样,该消息将被成功使用,但将在队列中生成一条具有相同正文的新消息。这条消息将在15分钟后被消费,由于我的业务逻辑,再次失败。将有 16 次失败 (16*15min=4h) 直到它最终被消耗而不产生新消息。
虽然这不是我要的,而且和其他答案类似(只是技术栈不同),但我决定写在这里,以提供java解决方案
TL;DR: 如何模仿 rabbitMQ's scheduling functionality 留住消费者:
- 无国籍
- 无需管理预定消息
- 在接收消息和最终在正确的计划时间使用消息之间,避免了计划消息的无用重试
我有一个在创建时具有默认属性的 SQS 队列。消费者处理一条消息的平均时间为1~2s。但是一些消息需要处理两次,在 4h window 之间。这些消息称为B,其他消息称为A。
假设我的队列中包含以下消息:A1, A2, B1, A3, B2
(5 条消息,最多 10 秒以消耗所有消息)在这些 table 的开头:
time | what should happen
---------|-------------------
now | consumer connected to queue
now+10s | all As were consumed successfully and deleted from queue
Bs had their unsuccessful first try and now they are waiting for their retry in 4h
between | nothing happens since no new messages arrived and old ones are waiting
now+4h4s | Bs successfully consumed during second retry and due that, deleted from queue
我有一个 Spring
应用程序,当我发现类型为 B
的消息时,我可以抛出异常。由于简单性和可扩展性,我希望有一个单线程消费消息,花费 1~2 秒来消费每条消息。
这样,如果可能的话,我不能挂起消息处理Delivery delay
since it postpones just the messages arriving at queue and not retries. If possible, I would like to keep using long polling @JmsListener
and avoid at all keeping any state on my memory's application. I want to avoid this
您可以将消息 B 发送到 Step Functions 状态机并进入等待状态以等待 4 小时,然后再将其发送到队列。状态机会为你保持状态,你可以send messages directly to SQS from Step Functions所以你不需要写任何代码。
我会编写一个小的 AWS Lambda 函数,每~分钟调用一次。该函数将获得一条消息(从希望的 FIFO 类型的 SQS 队列中取出)并检查它被添加的时间。如果添加 >= 4 小时,它会将其从传入队列中删除,并将其添加到延迟 4 小时的队列中,您的应用程序可以收听该队列。如果它移动了一条消息,请继续这样做,直到下一条消息不超过 4 小时。 Increase/decrease lambda 的频率将 'tight' 的粒度增加到 4 小时,但代价是 运行 lambda 更频繁。
这里是一个使用 SQS 的 AWS Lambda 函数的快速 link 示例:https://docs.aws.amazon.com/lambda/latest/dg/with-sqs-example.html
由于我将 JmsListener
与 setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
一起使用,我决定 运行 在可重新处理消息的消费者结束时这样做:
myAmazonSqsInstance.sendMessage(
new SendMessageRequest()
.withQueueUrl("queueName")
.withMessageBody(myMessageWithText)
.withDelaySeconds(900) // 900s = 15min
);
这样,该消息将被成功使用,但将在队列中生成一条具有相同正文的新消息。这条消息将在15分钟后被消费,由于我的业务逻辑,再次失败。将有 16 次失败 (16*15min=4h) 直到它最终被消耗而不产生新消息。
虽然这不是我要的,而且和其他答案类似(只是技术栈不同),但我决定写在这里,以提供java解决方案