轮询 AWS SQS 队列并从队列中删除接收到的消息的最佳实践?
Best practice for polling an AWS SQS queue and deleting received messages from queue?
我有一个 SQS 队列,它不断地被数据消费者填充,我现在正在尝试创建服务,使用 Python 的 boto 从 SQS 中提取这些数据。
我的设计方式是让 10-20 个线程都尝试从 SQS 队列中读取消息,然后在返回队列之前对数据(业务逻辑)执行它们必须执行的操作完成后获取下一批数据。如果没有数据,他们将等待直到有一些数据可用。
对于这个设计,我有两个地方不确定
- 是否需要使用长 time_out 值调用 receive_message(),如果在 20 秒(允许的最大值)内没有返回任何内容,那么只需重试?或者是否有一种只有在数据可用时才 returns 的阻塞方法?
- 我注意到,一旦我收到一条消息,它并没有从队列中删除,我是否必须接收一条消息,然后在收到消息后发送另一个请求将其从队列中删除?似乎有点矫枉过正。
谢谢
receive_message()
方法的长轮询功能是轮询 SQS 的最有效方式。如果 returns 没有任何消息,我建议在重试之前稍作延迟,尤其是当您有多个读者时。您甚至可能想要进行增量延迟,以便每个后续的空读取等待更长的时间,这样您就不会最终受到 AWS 的限制。
是的,您必须在阅读后删除邮件,否则它会重新出现在队列中。这实际上在工作人员读取消息然后在它可以完全处理消息之前失败的情况下非常有用。在这种情况下,它将被重新排队并由另一个工作人员读取。您还需要确保将消息的不可见超时设置得足够长,以便工作人员有足够的时间在消息自动重新出现在队列中之前对其进行处理。如有必要,您的工作人员可以在处理超时时调整超时时间,前提是超时时间比预期的要长。
另一种选择是使用 AWS Beanstalk 设置工作应用程序,如 this blogpost 中所述。
您的 Flask 应用程序不是使用 boto3 进行长轮询,而是将消息作为 HTTP post 中的 json 对象接收。正在设置的 HTTP 路径和消息类型可在 AWS Elastic Beanstalk 配置选项卡中配置:
AWS Elastic Beanstalk 的额外好处是能够根据 SQS 队列的大小动态扩展工作人员的数量,以及它的部署管理优势。
This 是我发现用作模板的示例应用程序。
如果您想要一种简单的方法来设置一个侦听器,包括在处理完消息时自动删除消息,以及将异常自动推送到指定队列,您可以使用 pySqsListener 程序包.
您可以像这样设置监听器:
from sqs_listener import SqsListener
class MyListener(SqsListener):
def handle_message(self, body, attributes, messages_attributes):
run_my_function(body['param1'], body['param2']
listener = MyListener('my-message-queue', 'my-error-queue')
listener.listen()
有一个标志可以从短轮询切换到长轮询 - 它都记录在 README 文件中。
免责声明:我是上述包的作者。
我有一个 SQS 队列,它不断地被数据消费者填充,我现在正在尝试创建服务,使用 Python 的 boto 从 SQS 中提取这些数据。
我的设计方式是让 10-20 个线程都尝试从 SQS 队列中读取消息,然后在返回队列之前对数据(业务逻辑)执行它们必须执行的操作完成后获取下一批数据。如果没有数据,他们将等待直到有一些数据可用。
对于这个设计,我有两个地方不确定
- 是否需要使用长 time_out 值调用 receive_message(),如果在 20 秒(允许的最大值)内没有返回任何内容,那么只需重试?或者是否有一种只有在数据可用时才 returns 的阻塞方法?
- 我注意到,一旦我收到一条消息,它并没有从队列中删除,我是否必须接收一条消息,然后在收到消息后发送另一个请求将其从队列中删除?似乎有点矫枉过正。
谢谢
receive_message()
方法的长轮询功能是轮询 SQS 的最有效方式。如果 returns 没有任何消息,我建议在重试之前稍作延迟,尤其是当您有多个读者时。您甚至可能想要进行增量延迟,以便每个后续的空读取等待更长的时间,这样您就不会最终受到 AWS 的限制。
是的,您必须在阅读后删除邮件,否则它会重新出现在队列中。这实际上在工作人员读取消息然后在它可以完全处理消息之前失败的情况下非常有用。在这种情况下,它将被重新排队并由另一个工作人员读取。您还需要确保将消息的不可见超时设置得足够长,以便工作人员有足够的时间在消息自动重新出现在队列中之前对其进行处理。如有必要,您的工作人员可以在处理超时时调整超时时间,前提是超时时间比预期的要长。
另一种选择是使用 AWS Beanstalk 设置工作应用程序,如 this blogpost 中所述。
您的 Flask 应用程序不是使用 boto3 进行长轮询,而是将消息作为 HTTP post 中的 json 对象接收。正在设置的 HTTP 路径和消息类型可在 AWS Elastic Beanstalk 配置选项卡中配置:
AWS Elastic Beanstalk 的额外好处是能够根据 SQS 队列的大小动态扩展工作人员的数量,以及它的部署管理优势。
This 是我发现用作模板的示例应用程序。
如果您想要一种简单的方法来设置一个侦听器,包括在处理完消息时自动删除消息,以及将异常自动推送到指定队列,您可以使用 pySqsListener 程序包.
您可以像这样设置监听器:
from sqs_listener import SqsListener
class MyListener(SqsListener):
def handle_message(self, body, attributes, messages_attributes):
run_my_function(body['param1'], body['param2']
listener = MyListener('my-message-queue', 'my-error-queue')
listener.listen()
有一个标志可以从短轮询切换到长轮询 - 它都记录在 README 文件中。
免责声明:我是上述包的作者。