去除来自 SQS 的消息

Debouncing messages from SQS

我需要构建一个精简服务,其工作是:

通过 HTTP post 向客户端发送更新通知网络挂钩,通知他们作业已更新。

ID 为 XYZ 的作业由 10 - 1000 个较小的部分组成,我们需要更新它们的状态。 1000 个零件的工作,可能需要 2 分钟我们只想每 10 秒左右更新一次(因此我们将发送此更新 webhook 12 次)。

我计划通过 SQS 对来自工作单元的更新消息进行排队,然后出队,执行去抖动并发送 webhook。我的问题是,我不知道如何对特定标识符 XYZ 执行去抖。

有没有人有任何想法或经验可以分享?如何执行去抖?

从严格意义上讲,我怀疑 "debouncing" 在这里并不完全正确,因为它在技术上指的是避免将预期的单个动作误解为多个动作(按曲目前进 ⏭ 一次,但无意中跳过两首歌曲而不是一首)因为按钮 "bounced" —— 而你想要避免的不是真正的重复消息,而是不必要的闲聊,可以合并为代表最新或最来自触发器集合的相关内容。

当然,这很像去抖动,因为它从根本上归结为“记住”你上次做某件事的任务,并且在某个特定的时间之前不允许相同或相似的事情再次发生时间过去了。

您可能正在寻找的相关数据结构是 associative array,它有许多其他名称,例如 hash map 并存储键(例如作业标识符) 和值(例如您触摸作业(触发事件)时的最后时间戳)。

但是当你意识到接下来的事情并决定不立即采取行动时,它会变得有点诡计,但是你无法保证是否或何时可能有更多后续相关的事情,所以你需要计时器,这样你就不会遗漏最后一件事,或者不恰当地延迟它。当已知完成特定工作时,您需要清理内存结构,并定期清除它以查找废弃的密钥,以处理由于任何原因从未正确完成的工作的情况......或者你最终会运行 内存不足。并且您必须了解事件的到达顺序,特别是如果有一个最终的 "done" 事件,因为在那之后收到的任何内容,无论出于何种原因,都不再相关。

当然,这一切都是可行的,但它可能会变得混乱,正如您无疑得出的结论。

最近在 SQS 中添加的功能可能在这方面有所帮助:FIFO queues

重要提示:冒着被误解的风险,我不是暗示 FIFO 队列是灵丹妙药,或者它是必要的或充分的。相反,我建议它具有一些可能有助于实现或简化任务的功能,即使其中一些功能甚至与 FIFO 队列的严格排序 FIFO 行为没有直接关系。

FIFO 队列中的每条消息都有一个 MessageGroupId property,它是您在将每条消息发送到队列时指定的不透明字母数字字符串。这可以表示特定的作业 ID——从而确保同一作业的队列消息的分组传送和按顺序传送。有用吧?

当从队列接收消息时,您在一批中收到的所有消息(根据文档)应该是具有相同 MessageGroupId 的消息...所以如果您将其设置为唯一代表作业的字符串,这意味着如果给定作业的多条消息在队列中,您将获得许多或所有消息,并按照它们发送到队列的顺序获得它们——这意味着您可以查看并可能丢弃除了最后一个,发出通知,从队列中删除消息,然后返回循环顶部并再次轮询 SQS。¹

这似乎大大简化了流程,但不一定完全解决您所描述的问题的核心问题 -- 因为下一批可能是同一工作,所以您需要发送另一批通知几乎立即。另一方面,也许这没关系,因为您可能会从队列中为同一作业读取多达 10 条消息²,因此在一个请求中,您仍然有可能消除多达 90% 的不必要消息。您的工作人员一次只会向远程端点发送一个通知。

如果您遇到不想立即变成事件的消息,您还可以通过更改消息可见性超时来利用另一个 SQS 功能——FIFO 队列显然改变了消息可见性的行为,因此所有具有给定 MessageGroupId 的消息,包括具有相同 ID 的未来消息,作为一个组一起保持不可见。 (您需要验证此行为。)

现在,以上内容可能会协同工作以从根本上完成您的需求,尽管诚然这并不是您所要求的全部 - 因为在工作量低的时候,它可能会传递比工作量大的时候更多的消息更高的工作量。如果您确实需要将外部通知传递限制为每个作业 "not more than once every n seconds",事情确实会变得有点复杂,因为当您最后一次为给定作业发送消息时,您肯定需要 "remember",这样您可以决定是否 "too soon" 发送另一个......所以你需要一个数据结构(上面提到的关联数组),当你为每个作业发送最后一条消息时,你可以 "remember" 所以您可以告诉 SQS 将一批消息隐藏多长时间...但这不太可能是一个关键结构,因为如果它的内容丢失会发生最糟糕的事情(例如通过重新启动此微服务,假设它是存储在内存中)是您发送下一条消息的速度比其他方式更快。


¹ 根据您希望在此设计中的积极性,在外循环中每次成功进行 SQS 长轮询时,如果您收到最大数量的消息,您可能需要考虑以下事实同一 MessageGroupId 可能还有更多消息在队列中等待,这意味着即使批处理中的最后一条消息必然是批处理中的最新消息 ,您收到的最后一条消息不一定是该组的最新消息,因为稍后的消息仍在队列中。考虑到这种特殊情况会增加复杂性。不考虑它只会增加您在低流量期间生成通知的频率......为了简单起见,这可能是一个可以接受的权衡。

² 根据文档,10 仍然是单个读取请求中可以接收的最大消息数,即使使用 FIFO 队列也是如此。该文档确实表明,FIFO 队列在某种意义上比传统队列更有可能接收尽可能多的消息,因为 SQS 内部的分布式扩展的实现方式有所不同。这伴随着 300 TPS 的性能限制与 FIFO 队列的权衡。由于 SQS 的内部扩展,传统队列实际上具有无限的 TPS,但代价是可能出现乱序和潜在(但很少见)的消息重复传递。