AWS Step Functions 使用来自 SQS 的消息
AWS Step Functions Consuming messages from SQS
我正在使用来自 SQS 的消息来触发查询。
当我在Python中正常消费来自SQS的消息时,我需要从SQS中删除消息。
我是否必须在 Step Function 中手动从 SQS 中删除消息?
best/simplest 这样做的方法是什么?
我相信 SQS 已经完成了集成:
{
"Comment": "Run Redshift Queries",
"StartAt": "ReceiveMessage from SQS",
"States": {
"ReceiveMessage from SQS": {
"Type": "Task",
"Parameters": {
"QueueUrl": "******"
},
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Next": "Run Analysis Queries",
"ResultSelector": {
"body.$": "States.StringToJson($.Messages[0].Body)"
}
},
"Run Analysis Queries": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"End": true
}
},
"TimeoutSeconds": 3600
}
我刚刚测试了一下,好像消息暂时下线了,然后又上线了。
在“ReceiveMessage from SQS”阶段和 Redshift 阶段之间插入 Lambda 的最佳方式是什么?
这引发了另一个问题。我手动只有 运行 这个。如何在任何消息上最终激活此 Step Function 到 运行?
如果您必须使用 SQS,那么您将需要一个 lambda 函数来充当代理。您需要将队列设置为 lambda 触发器,并且需要编写一个 lambda 来解析 SQS 消息并对 Step Functions StartExecution 进行适当的调用 API.
使用消息后,您必须使用 sqs:deleteMessage
将其删除。您看到它重新出现在队列中的原因是,一旦它被应用程序读取,它就会隐藏约 30 秒,以避免其他应用程序同时处理它。
下面是一个如何从队列中读取、处理和删除消息的示例。请注意,我添加了 MaxNumberOfMessages
等于 1
和 ResultPath
不同于 $
"ReceiveMessage from SQS": {
"Type": "Task",
"Parameters": {
"MaxNumberOfMessages": 1,
"QueueUrl": "******"
},
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Next": "Run Analysis Queries",
"ResultSelector": {
"body.$": "States.StringToJson($.Messages[0].Body)"
}
},
"Run Analysis Queries": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"ResultPath": "$.redshift_output",
"Next": "delete_sqs"
},
"delete_sqs": {
"Comment": "Deletes SQS message",
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
"Parameters": {
"ReceiptHandle.$": "$.Messages[0].ReceiptHandle",
"QueueUrl": "******"
},
"ResultPath": null,
"Next": "update_result"
}
此外,您一次最多可以阅读 10 条消息,设置 MaxNumberOfMessages
等于 10
以及此处示例中的地图步骤:
{
"StartAt": "read_sqs",
"States": {
"read_sqs": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Parameters": {
"MaxNumberOfMessages": 10,
"QueueUrl": "*******"
},
"ResultPath": "$.queueResponse",
"Next": "check_results"
},
"check_results": {
"Comment": "Checking if queue is empty",
"Type": "Choice",
"Choices": [
{
"Variable": "$.queueResponse.Messages[0]",
"IsPresent": true,
"Next": "map_results"
}
],
"Default": "exit"
},
"map_results": {
"Comment": "Performs a 'map' operation over each payload",
"Type": "Map",
"ItemsPath": "$.queueResponse.Messages",
"MaxConcurrency": 10,
"Iterator": {
"StartAt": "read_request",
"States": {
"read_request": {
"Comment": "Parses and moves the request body into the response",
"Type": "Pass",
"Parameters": {
"requestBody.$": "States.StringToJson($.Body)"
},
"ResultPath": "$.map_response",
"Next": "Run Analysis Queries"
},
"Run Analysis Queries": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"ResultPath": "$.redshift_output",
"Next": "delete_sqs"
},
"delete_sqs": {
"Comment": "Deletes SQS message",
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
"Parameters": {
"ReceiptHandle.$": "$.ReceiptHandle",
"QueueUrl": "*******"
},
"ResultPath": null,
"End": true
}
}
},
"ResultPath": "$.flowResponse",
"Next": "exit"
},
"exit": {
"Type": "Pass",
"End": true
}
}
}
我正在使用来自 SQS 的消息来触发查询。 当我在Python中正常消费来自SQS的消息时,我需要从SQS中删除消息。 我是否必须在 Step Function 中手动从 SQS 中删除消息? best/simplest 这样做的方法是什么?
我相信 SQS 已经完成了集成:
{
"Comment": "Run Redshift Queries",
"StartAt": "ReceiveMessage from SQS",
"States": {
"ReceiveMessage from SQS": {
"Type": "Task",
"Parameters": {
"QueueUrl": "******"
},
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Next": "Run Analysis Queries",
"ResultSelector": {
"body.$": "States.StringToJson($.Messages[0].Body)"
}
},
"Run Analysis Queries": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"End": true
}
},
"TimeoutSeconds": 3600
}
我刚刚测试了一下,好像消息暂时下线了,然后又上线了。
在“ReceiveMessage from SQS”阶段和 Redshift 阶段之间插入 Lambda 的最佳方式是什么?
这引发了另一个问题。我手动只有 运行 这个。如何在任何消息上最终激活此 Step Function 到 运行?
如果您必须使用 SQS,那么您将需要一个 lambda 函数来充当代理。您需要将队列设置为 lambda 触发器,并且需要编写一个 lambda 来解析 SQS 消息并对 Step Functions StartExecution 进行适当的调用 API.
使用消息后,您必须使用 sqs:deleteMessage
将其删除。您看到它重新出现在队列中的原因是,一旦它被应用程序读取,它就会隐藏约 30 秒,以避免其他应用程序同时处理它。
下面是一个如何从队列中读取、处理和删除消息的示例。请注意,我添加了 MaxNumberOfMessages
等于 1
和 ResultPath
不同于 $
"ReceiveMessage from SQS": {
"Type": "Task",
"Parameters": {
"MaxNumberOfMessages": 1,
"QueueUrl": "******"
},
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Next": "Run Analysis Queries",
"ResultSelector": {
"body.$": "States.StringToJson($.Messages[0].Body)"
}
},
"Run Analysis Queries": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"ResultPath": "$.redshift_output",
"Next": "delete_sqs"
},
"delete_sqs": {
"Comment": "Deletes SQS message",
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
"Parameters": {
"ReceiptHandle.$": "$.Messages[0].ReceiptHandle",
"QueueUrl": "******"
},
"ResultPath": null,
"Next": "update_result"
}
此外,您一次最多可以阅读 10 条消息,设置 MaxNumberOfMessages
等于 10
以及此处示例中的地图步骤:
{
"StartAt": "read_sqs",
"States": {
"read_sqs": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Parameters": {
"MaxNumberOfMessages": 10,
"QueueUrl": "*******"
},
"ResultPath": "$.queueResponse",
"Next": "check_results"
},
"check_results": {
"Comment": "Checking if queue is empty",
"Type": "Choice",
"Choices": [
{
"Variable": "$.queueResponse.Messages[0]",
"IsPresent": true,
"Next": "map_results"
}
],
"Default": "exit"
},
"map_results": {
"Comment": "Performs a 'map' operation over each payload",
"Type": "Map",
"ItemsPath": "$.queueResponse.Messages",
"MaxConcurrency": 10,
"Iterator": {
"StartAt": "read_request",
"States": {
"read_request": {
"Comment": "Parses and moves the request body into the response",
"Type": "Pass",
"Parameters": {
"requestBody.$": "States.StringToJson($.Body)"
},
"ResultPath": "$.map_response",
"Next": "Run Analysis Queries"
},
"Run Analysis Queries": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"ResultPath": "$.redshift_output",
"Next": "delete_sqs"
},
"delete_sqs": {
"Comment": "Deletes SQS message",
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
"Parameters": {
"ReceiptHandle.$": "$.ReceiptHandle",
"QueueUrl": "*******"
},
"ResultPath": null,
"End": true
}
}
},
"ResultPath": "$.flowResponse",
"Next": "exit"
},
"exit": {
"Type": "Pass",
"End": true
}
}
}