AWS Step Functions 使用来自 SQS 的消息

AWS Step Functions Consuming messages from SQS

我正在使用来自 SQS 的消息来触发查询。 当我在Python中正常消费来自SQS的消息时,我需要从SQS中删除消息。 我是否必须在 Step Function 中手动从 SQS 中删除消息? best/simplest 这样做的方法是什么?

我相信 SQS 已经完成了集成:

{
  "Comment": "Run Redshift Queries",
  "StartAt": "ReceiveMessage from SQS",
  "States": {
    "ReceiveMessage from SQS": {
      "Type": "Task",
      "Parameters": {
        "QueueUrl": "******"
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
      "Next": "Run Analysis Queries",
      "ResultSelector": {
        "body.$": "States.StringToJson($.Messages[0].Body)"
      }
    },
    "Run Analysis Queries": {
      "Type": "Task",
      "Parameters": {
        "ClusterIdentifier": "******",
        "Database": "prod",
        "Sql": "select * from ******"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
      "End": true
    }
  },
  "TimeoutSeconds": 3600
}

我刚刚测试了一下,好像消息暂时下线了,然后又上线了。

在“ReceiveMessage from SQS”阶段和 Redshift 阶段之间插入 Lambda 的最佳方式是什么?

这引发了另一个问题。我手动只有 运行 这个。如何在任何消息上最终激活此 Step Function 到 运行?

如果您必须使用 SQS,那么您将需要一个 lambda 函数来充当代理。您需要将队列设置为 lambda 触发器,并且需要编写一个 lambda 来解析 SQS 消息并对 Step Functions StartExecution 进行适当的调用 API.

使用消息后,您必须使用 sqs:deleteMessage 将其删除。您看到它重新出现在队列中的原因是,一旦它被应用程序读取,它就会隐藏约 30 秒,以避免其他应用程序同时处理它。

下面是一个如何从队列中读取、处理和删除消息的示例。请注意,我添加了 MaxNumberOfMessages 等于 1ResultPath 不同于 $

"ReceiveMessage from SQS": {
  "Type": "Task",
  "Parameters": {
    "MaxNumberOfMessages": 1,
    "QueueUrl": "******"
  },
  "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
  "Next": "Run Analysis Queries",
  "ResultSelector": {
    "body.$": "States.StringToJson($.Messages[0].Body)"
  }
},
"Run Analysis Queries": {
  "Type": "Task",
  "Parameters": {
    "ClusterIdentifier": "******",
    "Database": "prod",
    "Sql": "select * from ******"
  },
  "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
  "ResultPath": "$.redshift_output",
  "Next": "delete_sqs"
},
"delete_sqs": {
  "Comment": "Deletes SQS message",
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
  "Parameters": {
    "ReceiptHandle.$": "$.Messages[0].ReceiptHandle",
    "QueueUrl": "******"
  },
  "ResultPath": null,
  "Next": "update_result"
}

此外,您一次最多可以阅读 10 条消息,设置 MaxNumberOfMessages 等于 10 以及此处示例中的地图步骤:

{
    "StartAt": "read_sqs",
    "States": {
      "read_sqs": {
        "Type": "Task",
        "Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
        "Parameters": {
          "MaxNumberOfMessages": 10,
          "QueueUrl": "*******"
        },
        "ResultPath": "$.queueResponse",
        "Next": "check_results"
      },
      "check_results": {
        "Comment": "Checking if queue is empty",
        "Type": "Choice",
        "Choices": [
          {
            "Variable": "$.queueResponse.Messages[0]",
            "IsPresent": true,
            "Next": "map_results"
          }
        ],
        "Default": "exit"
      },
      "map_results": {
        "Comment": "Performs a 'map' operation over each payload",
        "Type": "Map",
        "ItemsPath": "$.queueResponse.Messages",
        "MaxConcurrency": 10,
        "Iterator": {
          "StartAt": "read_request",
          "States": {
            "read_request": {
              "Comment": "Parses and moves the request body into the response",
              "Type": "Pass",
              "Parameters": {
                "requestBody.$": "States.StringToJson($.Body)"
              },
              "ResultPath": "$.map_response",
              "Next": "Run Analysis Queries"
            },
            "Run Analysis Queries": {
              "Type": "Task",
              "Parameters": {
                "ClusterIdentifier": "******",
                "Database": "prod",
                "Sql": "select * from ******"
              },
              "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
              "ResultPath": "$.redshift_output",
              "Next": "delete_sqs"
            },
            "delete_sqs": {
              "Comment": "Deletes SQS message",
              "Type": "Task",
              "Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
              "Parameters": {
                "ReceiptHandle.$": "$.ReceiptHandle",
                "QueueUrl": "*******"
              },
              "ResultPath": null,
              "End": true
            }
          }
        },
        "ResultPath": "$.flowResponse",
        "Next": "exit"
      },
      "exit": {
        "Type": "Pass",
        "End": true
      }
    }
  }