AWS Lambda 从 SQS 队列读取
AWS Lambda to read from SQS queue
我有一个 AWS Lambda 函数可以从 SQS 队列中读取数据。 lambda 逻辑基本上是从 SQS 读取一条消息,然后处理并删除该消息。读取消息的代码类似于。
ReceiveMessageRequest messageRequest =
new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(5).withMaxNumberOfMessages(1);
现在我的问题是什么是触发此 lambda 的最佳方式以及此 lambda 如何缩放,例如,如果队列中有 1000 条消息,那么是否会有 1000 个 lambda 运行 ,因为在我的例子中,一个 lambda 只能从队列中读取一条消息。
有关此类设计的最佳实践的任何指示。
现在你最好的选择可能是设置一个 AWS Cloudwatch 事件规则,在你需要的时间间隔调用 lambda 函数。
这是一个来自 AWS 的示例应用程序来执行此操作:
https://github.com/awslabs/aws-serverless-sqs-event-source
我相信 AWS 最终会支持 SQS 作为 AWS lambda 的事件类型,这应该会让这更容易,但现在你最好的选择可能是我上面链接的代码版本。
可能有几种方法可以做到这一点,但我发现 this guide 在我尝试实现您在 Node.js 中描述的同类功能时非常有用。这种策略的一个缺点是您只能每 60 秒轮询一次队列。
基本工作流程如下所示:
设置当队列中有一定数量的消息时触发的 CloudWatch 警报。
Cloudwatch 警报然后发布到 SNS
SNS 消息触发 Lambda scale() 函数
scale() 函数更新 DynamoDB table 中设置所需工作进程数的配置记录
然后您有一个主 CloudWatch 计划,它每 60 秒调用一次 worker() 函数
worker() 函数从 DynamoDB 读取配置,根据队列大小确定需要多少并发进程。
Worker() 然后调用适当数量的 process() 函数
Process() 函数使用来自 SQS 的消息,执行您的主要应用程序逻辑,然后从队列中删除项目。
您可以在 Node.js here
中找到缩放函数的示例
我已经在生产环境中使用这个解决方案将近一年没有出现任何问题,即使队列中有数千条消息。如果你剪掉缩放部分,它一次只会发送一条消息。
AWS 于 2018 年 6 月添加了本机支持:https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/
我们现在可以使用 SQS 消息来触发 AWS Lambda 函数。此外,不再需要 运行 消息轮询服务或创建 SQS 到 SNS 映射。
更多详情:
https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/
https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
我有一个 AWS Lambda 函数可以从 SQS 队列中读取数据。 lambda 逻辑基本上是从 SQS 读取一条消息,然后处理并删除该消息。读取消息的代码类似于。
ReceiveMessageRequest messageRequest =
new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(5).withMaxNumberOfMessages(1);
现在我的问题是什么是触发此 lambda 的最佳方式以及此 lambda 如何缩放,例如,如果队列中有 1000 条消息,那么是否会有 1000 个 lambda 运行 ,因为在我的例子中,一个 lambda 只能从队列中读取一条消息。
有关此类设计的最佳实践的任何指示。
现在你最好的选择可能是设置一个 AWS Cloudwatch 事件规则,在你需要的时间间隔调用 lambda 函数。
这是一个来自 AWS 的示例应用程序来执行此操作:
https://github.com/awslabs/aws-serverless-sqs-event-source
我相信 AWS 最终会支持 SQS 作为 AWS lambda 的事件类型,这应该会让这更容易,但现在你最好的选择可能是我上面链接的代码版本。
可能有几种方法可以做到这一点,但我发现 this guide 在我尝试实现您在 Node.js 中描述的同类功能时非常有用。这种策略的一个缺点是您只能每 60 秒轮询一次队列。
基本工作流程如下所示:
设置当队列中有一定数量的消息时触发的 CloudWatch 警报。
Cloudwatch 警报然后发布到 SNS
SNS 消息触发 Lambda scale() 函数
scale() 函数更新 DynamoDB table 中设置所需工作进程数的配置记录
然后您有一个主 CloudWatch 计划,它每 60 秒调用一次 worker() 函数
worker() 函数从 DynamoDB 读取配置,根据队列大小确定需要多少并发进程。
Worker() 然后调用适当数量的 process() 函数
Process() 函数使用来自 SQS 的消息,执行您的主要应用程序逻辑,然后从队列中删除项目。
您可以在 Node.js here
中找到缩放函数的示例我已经在生产环境中使用这个解决方案将近一年没有出现任何问题,即使队列中有数千条消息。如果你剪掉缩放部分,它一次只会发送一条消息。
AWS 于 2018 年 6 月添加了本机支持:https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/
我们现在可以使用 SQS 消息来触发 AWS Lambda 函数。此外,不再需要 运行 消息轮询服务或创建 SQS 到 SNS 映射。
更多详情:
https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/
https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html