AWS Step-Functions SQS CreateQueue Fifo 支持

AWS Step-Functions SQS CreateQueue Fifo Support

我正在 AWS Step Functions 中设计状态机。我计划使用 SQS 来向后端 API 提供状态更新。 SQS 类型应该是 Fifo,因为消息应该按照创建的相同顺序传递。我将用户 ID 用作 QueueName 并将其格式化以使其符合 fifo 要求。

Pass State(合并在transformInputsresultpath下)

  "sqsParameters": {
    "queueName.$": "States.Format('{}.fifo',$.session.sqsQueueName)",
    "messageGroupId.$": "$.session.sqsQueueName"
  }

Amazon SQS CreateQueue 状态

{
  "QueueName.$": "$.transformInputs.sqsParameters.queueName"
}

提供给 $.session.sqsQueueName 的实际输入是 test

在Pass状态下这个名字被转换成test.fifo

以上转换工作正常,我在 CreateQueue State 的输入部分得到了我期望的任何内容。

问题:

当我执行状态机时,出现以下异常。

Error

Sqs.SqsException
Cause

Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length (Service: Sqs, Status Code: 400, Request ID: a20c3c98-239b-4686-9030-26704d0f844d, Extended Request ID: null)

我可以验证从日志的 taskscheduled 部分

传递到 API 的内容
11  TaskScheduled   CreateQueue - ...
{
  "resourceType": "aws-sdk:sqs",
  "resource": "createQueue",
  "region": "eu-masked-1",
  "parameters": {
    "QueueName": "test.fifo"
  },
  "timeoutInSeconds": null,
  "heartbeatInSeconds": null
}

然后我添加 FifoQueue 属性,如 API 文档 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html 中所述 API WorkFlow Studio 设计表单中的参数部分

{
  "QueueName.$": "$.transformInputs.sqsParameters.queueName",
  "FifoQueue" : true
}

这次当我尝试保存工作流程时出现以下异常:

There are Amazon States Language errors in your state machine definition. Fix the errors to continue.
The field "FifoQueue" is not supported by Step Functions
For more information, see Amazon States Language 

Json定义对应SQS Queue Creation State

    "CreateQueue": {
      "Type": "Task",
      "Next": "Batch SubmitJob",
      "Parameters": {
        "QueueName.$": "$.transformInputs.sqsParameters.queueName",
        "FifoQueue": true
      },
      "Resource": "arn:aws:states:::aws-sdk:sqs:createQueue",
      "ResultPath": "$.createQueue"
    }

将参数块替换为:

"Parameters": {
    "QueueName.$": "$.transformInputs.sqsParameters.queueName",
    "Attributes": {"FifoQueue": true}
  },

(我 运行 今天进入这个,文档不是很好...)

这是一种从对我有用的内部步进函数创建 FIFO 队列的方法。 (Amazon SQS:CreateQueue 阶段)

{
  "QueueName.$": "States.Format('{}.fifo',$.sqsQueueName)",
  "Attributes": {
    "FifoQueue": "true",
    "ContentBasedDeduplication": "true"
  }
}

这是示例发送消息格式:(Amazon SQS:SendMessage Stage)

{
  "QueueUrl.$": "$.createQueue.QueueUrl",
  "MessageGroupId.$": "$$.Execution.Name",
  "MessageBody": {
    "message": "Job Succeeded",
    "trace.$": "$"
  }
}

备注:

  • MessageGroupId很重要,否则会失败。你可以分配任何东西。在上面的示例中,$$.Execution.Name 作为 MessageGroupId 提供,它指的是状态机主动 运行 周期的上下文对象的执行名称。例如:b3d429a5-d499-5806-095d-524384x937d2
  • 如果您没有将 ContentBasedDeduplication 指定为 true,那么您应该提供一个 MessageDeduplicationId 字段,每条消息都有一个唯一的 ID没有 lambda 的帮助很难在步进函数中实现。
  • 勾选Add original input to output using ResultPath并选择Combine original input with result然后输入一个值,例如$.createQueue 在 CreateQueue 阶段的 Output 选项卡中。这为您提供 $.createQueue.QueueUrl 以及来自先前阶段的其他变量。
  • 如果队列已经存在,CreateQueue 也可以工作。它在每次执行中为您提供 queueUrl,以供下一阶段使用。仅当您尝试更改现有队列上的 属性 时才会失败。

状态定义:

"CreateQueue": {
  "Type": "Task",
  "Next": "Command Selector",
  "Parameters": {
    "QueueName.$": "States.Format('{}.fifo',$.sqsQueueName)",
    "Attributes": {
      "FifoQueue": "true",
      "ContentBasedDeduplication": "true"
    }
  },
  "Resource": "arn:aws:states:::aws-sdk:sqs:createQueue",
  "ResultPath": "$.createQueue"
}

这是 API 参考,您可以在其中找到其他属性。 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html