SQS 队列中的批处理完成后如何通知?

How to notify once a batch in an SQS queue is done?

我在 SQS 队列和一些工作人员中有一批 n 消息。这些工作人员从队列中获取消息,对其进行处理,如果成功则将其删除。一旦所有工作人员都完成了这批 n 消息,我想执行一个额外的操作。唯一的问题是确定批处理何时完成。

一种方法是检查队列是否为空。当我查看 SQS API 时,唯一看起来接近的是您从 GetQueueAttributes 获得的 ApproximateNumberOfMessages 属性。但是,"approximate" 这个词表明它实际上并不是我想要的,它的目的更多是根据队列中大致有多少消息来增加和减少工作人员的数量。

实现我想要的目标的标准方法是什么?还是 SQS 不适合这个目的?

您可以考虑向您的工作进程添加一些代码,当它要求处理一条消息但没有任何返回时启动某种计时器;如果您的工作人员请求消息、处理消息然后删除消息,正如您所说 'batch' 只是同时收到的消息的集合,那么大概是 5 分钟(或其他用户定义的时间段)过去了,并且在重复请求后没有返回新消息,您也许可以启动您的 'after batch' 流程。如果您可以在工作进程到达队列末尾时将工作进程缩减为一个,这将更加准确(这样您可以确保其他节点没有仍在处理)。

这绝不是完美的 - 将取决于消息的流/时间以及定义什么属于 'batch' 什么不属于

的重要性。

或者,如果在前端您知道放入批处理中的消息的准确数量,则可以对已处理的消息数进行倒计时,并且当您减少到零时就知道您已关闭。

SQS 实际上并没有任何用于分组消息的内置机制。此外,SQS 不保证特定消息不会被处理超过一次[1],因此您不能简单地计算处理的消息数。

相反,您可能需要在外部数据存储中单独跟踪每条消息,然后在处理完每条消息后,检查是否还有剩余消息。

例如:

  1. 当您将组中的每条消息排入原始队列时,在外部数据库中记录 message ID 以及您自己发明的组号。
  2. worker 处理消息后,worker 应该从数据库中获取该消息的组号(或者只将组号作为 attribute 包含在原始消息中),并删除消息 ID从数据库中(如果它还没有被另一个工作人员删除,如果两个工作人员从队列中得到相同的消息,这可能会发生)。然后工作人员应将包含组号的新消息排入第二个队列。
  3. 另一个工作人员从第二个队列中读取包含组号的消息,并检查数据库以查看是否还有该组号的原始消息。如果有的话,这个工人什么都不做。如果该组没有更多消息,该工作人员将执行您的附加操作。请注意,由于 SQS 的分布式特性,这条最终消息也可能被处理多次,因此附加操作应该是幂等的(或者至少以某种方式检查它是否已经执行过)。

通过此设置,您将能够 运行 同时通过系统处理多个不相关的批次。