在 SQS 队列中使用多个消费者
Using many consumers in SQS Queue
我知道可以使用多个线程来使用 SQS 队列。我想保证每条消息都会被消费一次。我知道可以更改消息的可见性超时,例如,等于我的处理时间。如果我的进程花费的时间超过可见性超时(例如连接速度慢),则其他线程可以使用相同的消息。
保证一条消息被处理一次的最佳方法是什么?
当您收到消息时,将消息或对消息的引用存储在对消息 ID 具有唯一约束的数据库中。如果 ID 存在于 table 中,您已经收到它,并且数据库将不允许您再次插入它 -- 因为唯一约束。
AWS SQS API 不会在您使用 API 等阅读消息时自动 "consume" 消息。需要开发者自行调用删除消息
SQS 确实有一个功能调用 "redrive policy" 作为 "Dead letter Queue Setting" 的一部分。您只需将读取请求设置为1。如果消费进程崩溃,后续读取同一消息会将消息放入死信队列。
SQS 队列可见性超时最多可设置为 12 小时。除非您有特殊需要,否则您需要实现将消息处理程序存储在数据库中以供检查的过程。
您可以对消息和批处理使用 setVisibilityTimeout(),以延长可见时间,直到线程完成对消息的处理。
这可以通过使用 scheduledExecutorService 来完成,并在初始可见时间减半后安排一个可运行的事件。下面的代码片段创建并执行 VisibilityTimeExtender,每隔一半的 visibilityTime,周期为可见时间的一半。 (时间要保证消息被处理,用visibilityTime/2扩展)
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> futureEvent = scheduler.scheduleAtFixedRate(new VisibilityTimeExtender(..), visibilityTime/2, visibilityTime/2, TimeUnit.SECONDS);
VisibilityTimeExtender 必须实现 Runnable,并且是您更新新可见性时间的地方。
当线程处理完消息后,您可以将其从队列中删除,并调用 futureEvent.cancel(true) 来停止计划的事件。
What is the best approach to guarantee that a message will be processed once?
您要求保证 - 您不会得到保证。您可以将消息被多次处理的可能性降低到 非常小的数量 ,但您不会得到 保证 。
我将解释原因,以及减少重复的策略。
重复从何而来
- 当您将一条消息放入 SQS 时,SQS 实际上可能不止一次收到该消息
- 例如:在发送消息时出现轻微的网络故障导致了自动重试的暂时性错误 - 从消息发送者的角度来看,它失败了一次,成功发送了一次,但是 SQS 收到了两条消息。
- SQS can internally generate duplicates
- 类似于第一个示例 - 有很多计算机在幕后处理消息,SQS 需要确保没有任何东西丢失 - 消息存储在多个服务器上,这会导致重复。
在大多数情况下,通过利用 SQS message visibility timeout,从这些来源复制的机会已经非常小 - 很小的百分之几。
如果处理重复确实那么糟糕(strive to make your message consumption idempotent!),我认为这已经足够好了 - 进一步减少重复的机会很复杂并且可能贵...
您的应用程序可以做什么来进一步减少重复?
好吧,我们现在进入兔子洞...在较高的层次上,您将希望为您的消息分配唯一的 ID,并在开始处理之前检查正在进行或已完成的 ID 的原子缓存:
- 确保您的消息具有在插入时提供的唯一标识符
- 没有这个,您将无法区分重复项。
- 在 'end of the line' 处理消息的重复。
- 如果您的消息接收方需要发送消息以进行进一步处理,那么它可能是另一个重复来源(出于与上述类似的原因)
- 您需要在某处自动存储和检查这些唯一 ID(并在超时后刷新它们)。有两个重要的状态:"InProgress"和"Completed"
- InProgress 条目应该有一个超时时间,该超时时间取决于处理失败时您需要恢复的速度。
- 已完成的条目应该有一个超时时间,具体取决于您希望重复数据删除的时间长短 window
- 最简单的可能是 Guava cache,但只适用于单个处理应用程序。如果你有很多消息或分布式消费,考虑这个工作的数据库(有一个后台进程来扫描过期的条目)
- 在处理消息之前,尝试将 messageId 存储在 "InProgress" 中。如果它已经存在,请停止 - 你刚刚处理了一个副本。
- 检查消息是否为 "Completed"(如果存在则停止)
- 您的线程现在对该 messageId 具有独占锁定 - 处理您的消息
- 将 messageId 标记为 "Completed" - 只要此 messageId 保留在这里,您就不会处理该 messageId 的任何重复项。
- 不过您可能负担不起无限存储空间。
- 从 "InProgress" 中删除 messageId(或者让它从这里过期)
一些笔记
- 请记住,没有所有这些的重复的机会已经很低了。根据对消息进行重复数据删除的时间和金钱对您来说值得多少时间和金钱,可以随意跳过或修改任何步骤
- 例如,您可以省略 "InProgress",但这会增加两个线程同时处理重复消息的小几率(第二个在第一个之前开始 "Completed"它)
- 你的重复数据删除window只要你能在"Completed"中保留messageIds。由于您可能负担不起无限存储,因此至少将其持续时间设为 SQS 消息可见性超时的 2 倍;之后重复的机会减少了(除了已经很低的机会,但仍然不能保证)。
- 即便如此,仍然存在重复的机会 - 所有的预防措施和 SQS 消息可见性超时都有助于将这种机会减少到非常小,但机会仍然存在:
- 您的应用程序可以 crash/hang/do 在处理完消息之后,但在 messageId 为 "Completed" 之前立即进行一次很长的 GC(也许您正在使用数据库进行此存储并且与它的连接是向下)
- 在这种情况下,"Processing" 最终将过期,并且另一个线程可以处理此消息(在 SQS 可见性超时也过期之后或者因为 SQS 中有重复项)。
我知道可以使用多个线程来使用 SQS 队列。我想保证每条消息都会被消费一次。我知道可以更改消息的可见性超时,例如,等于我的处理时间。如果我的进程花费的时间超过可见性超时(例如连接速度慢),则其他线程可以使用相同的消息。
保证一条消息被处理一次的最佳方法是什么?
当您收到消息时,将消息或对消息的引用存储在对消息 ID 具有唯一约束的数据库中。如果 ID 存在于 table 中,您已经收到它,并且数据库将不允许您再次插入它 -- 因为唯一约束。
AWS SQS API 不会在您使用 API 等阅读消息时自动 "consume" 消息。需要开发者自行调用删除消息
SQS 确实有一个功能调用 "redrive policy" 作为 "Dead letter Queue Setting" 的一部分。您只需将读取请求设置为1。如果消费进程崩溃,后续读取同一消息会将消息放入死信队列。
SQS 队列可见性超时最多可设置为 12 小时。除非您有特殊需要,否则您需要实现将消息处理程序存储在数据库中以供检查的过程。
您可以对消息和批处理使用 setVisibilityTimeout(),以延长可见时间,直到线程完成对消息的处理。
这可以通过使用 scheduledExecutorService 来完成,并在初始可见时间减半后安排一个可运行的事件。下面的代码片段创建并执行 VisibilityTimeExtender,每隔一半的 visibilityTime,周期为可见时间的一半。 (时间要保证消息被处理,用visibilityTime/2扩展)
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> futureEvent = scheduler.scheduleAtFixedRate(new VisibilityTimeExtender(..), visibilityTime/2, visibilityTime/2, TimeUnit.SECONDS);
VisibilityTimeExtender 必须实现 Runnable,并且是您更新新可见性时间的地方。
当线程处理完消息后,您可以将其从队列中删除,并调用 futureEvent.cancel(true) 来停止计划的事件。
What is the best approach to guarantee that a message will be processed once?
您要求保证 - 您不会得到保证。您可以将消息被多次处理的可能性降低到 非常小的数量 ,但您不会得到 保证 。
我将解释原因,以及减少重复的策略。
重复从何而来
- 当您将一条消息放入 SQS 时,SQS 实际上可能不止一次收到该消息
- 例如:在发送消息时出现轻微的网络故障导致了自动重试的暂时性错误 - 从消息发送者的角度来看,它失败了一次,成功发送了一次,但是 SQS 收到了两条消息。
- SQS can internally generate duplicates
- 类似于第一个示例 - 有很多计算机在幕后处理消息,SQS 需要确保没有任何东西丢失 - 消息存储在多个服务器上,这会导致重复。
在大多数情况下,通过利用 SQS message visibility timeout,从这些来源复制的机会已经非常小 - 很小的百分之几。
如果处理重复确实那么糟糕(strive to make your message consumption idempotent!),我认为这已经足够好了 - 进一步减少重复的机会很复杂并且可能贵...
您的应用程序可以做什么来进一步减少重复?
好吧,我们现在进入兔子洞...在较高的层次上,您将希望为您的消息分配唯一的 ID,并在开始处理之前检查正在进行或已完成的 ID 的原子缓存:
- 确保您的消息具有在插入时提供的唯一标识符
- 没有这个,您将无法区分重复项。
- 在 'end of the line' 处理消息的重复。
- 如果您的消息接收方需要发送消息以进行进一步处理,那么它可能是另一个重复来源(出于与上述类似的原因)
- 您需要在某处自动存储和检查这些唯一 ID(并在超时后刷新它们)。有两个重要的状态:"InProgress"和"Completed"
- InProgress 条目应该有一个超时时间,该超时时间取决于处理失败时您需要恢复的速度。
- 已完成的条目应该有一个超时时间,具体取决于您希望重复数据删除的时间长短 window
- 最简单的可能是 Guava cache,但只适用于单个处理应用程序。如果你有很多消息或分布式消费,考虑这个工作的数据库(有一个后台进程来扫描过期的条目)
- 在处理消息之前,尝试将 messageId 存储在 "InProgress" 中。如果它已经存在,请停止 - 你刚刚处理了一个副本。
- 检查消息是否为 "Completed"(如果存在则停止)
- 您的线程现在对该 messageId 具有独占锁定 - 处理您的消息
- 将 messageId 标记为 "Completed" - 只要此 messageId 保留在这里,您就不会处理该 messageId 的任何重复项。
- 不过您可能负担不起无限存储空间。
- 从 "InProgress" 中删除 messageId(或者让它从这里过期)
一些笔记
- 请记住,没有所有这些的重复的机会已经很低了。根据对消息进行重复数据删除的时间和金钱对您来说值得多少时间和金钱,可以随意跳过或修改任何步骤
- 例如,您可以省略 "InProgress",但这会增加两个线程同时处理重复消息的小几率(第二个在第一个之前开始 "Completed"它)
- 你的重复数据删除window只要你能在"Completed"中保留messageIds。由于您可能负担不起无限存储,因此至少将其持续时间设为 SQS 消息可见性超时的 2 倍;之后重复的机会减少了(除了已经很低的机会,但仍然不能保证)。
- 即便如此,仍然存在重复的机会 - 所有的预防措施和 SQS 消息可见性超时都有助于将这种机会减少到非常小,但机会仍然存在:
- 您的应用程序可以 crash/hang/do 在处理完消息之后,但在 messageId 为 "Completed" 之前立即进行一次很长的 GC(也许您正在使用数据库进行此存储并且与它的连接是向下)
- 在这种情况下,"Processing" 最终将过期,并且另一个线程可以处理此消息(在 SQS 可见性超时也过期之后或者因为 SQS 中有重复项)。