来自 SQS 的 AWS Step Functions 状态机结果选择器
AWS Step Functions State Machine Result Selctor from SQS
我正在构建我的第一个简单的 AWS Step Function:
第 1 步:读取来自 SQS 的消息
第 2 步:运行 Redshift 查询
我已经成功地设置了我的初始工作流程和对资源的权限。我正在尝试格式化我的
这是我第一步的结果选择器代码:
"ResultSelector": {
"startTime.$": "$.Body.startTime",
"endTime.$": "$.Body.endTime"
}
这是错误输出:
{
"error": "States.Runtime",
"cause": "An error occurred while executing the state 'ReceiveMessage from SQS' (entered at the event id #2). The JSONPath '$.Body.startTime' specified for the field 'startTime.$' could not be found in the input '{\"Messages\":[{\"Body\":\"{\\"startTime\\": 12828373, \\"endTime\\": 12828374}\",\"Md5OfBody\":\"1ca004b811be50a7579f3a7e6affaeb1\",\"MessageId\":\"5b6b72d6-3fb7-451a-9053-26b72de32768\",\"ReceiptHandle\":\"AQEBjgttgZBNSuGZb0LFjo16Xc2+5uc48k9QPh9af4vTGy1xzb/BZZ6mBFx0FQ4ALWUoLfppWHIJbb7Zax2+Jv2dqekvLoCWWrdjasyrJnGfduXwsY20cPuW86kXz4RJfP4qbnyvWcV5Cb63u26XUE3S+3AqREo2BNwi01mI3ceYWxguXzgSgDIMg/07Lt5kBcNvB6qCIexQDMDgH91ZmmINLuX0j5gd3spHCmzBvFoQKv4PbLBS18PsC6vL1YLGxplZ+eVyzD3eoK/5AU0jqmI0l0vjn4qZCVf2iOupkwEmMe4V2AaEdxI1FJ9dSU8zPkqFhbB3na56eIXgIGwsTWs5WBvlHikTuyYjWNFn6r27qCVfwADGkENoXZG1++vRoRse9X3p5fNqqVNBL1NTBL3k3/hoxB7/A922PT0MTn7rm1I=\"}]}'"
}
问题很明显:
我仍然没有创建正确的结果选择器。
我已经 json 格式化了我在 Python 中的输入:
message = {
'startTime': 12828373,
'endTime': 12828374
}
# Send message to SQS queue
response = sqs.send_message(
QueueUrl=queue_url,
DelaySeconds=10,
MessageBody=json.dumps(message)
)
不过step函数好像是在抓取多条消息。我也没有正确提取数据。
请帮助结果选择器如何提取开始时间和结束时间。
还有什么我想念的或应该做的不同吗?
我的整个步骤函数如下:
{
"Comment": "Run Redshift Queries",
"StartAt": "ReceiveMessage from SQS",
"States": {
"ReceiveMessage from SQS": {
"Type": "Task",
"Parameters": {
"QueueUrl": "******"
},
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Next": "Run Analysis Queries",
"ResultSelector": {
"startTime.$": "$.Body.startTime",
"endTime.$": "$.Body.endTime"
}
},
"Run Analysis Queries": {
"Type": "Task",
"End": true,
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement"
}
},
"TimeoutSeconds": 3600
}
从错误信息来看,你第一个状态的输出是:
{
"Messages": [
{
"Body": "{\"startTime\": 12828373, \"endTime\": 12828374}",
"Md5OfBody": "1ca004b811be50a7579f3a7e6affaeb1",
"MessageId": "5b6b72d6-3fb7-451a-9053-26b72de32768",
"ReceiptHandle": "AQEBjgtt...."
}
]
}
所以目标在$.Messages[0].Body
下,是一个JSON字符串。您可以使用内置的内部函数来处理它:
{
"Comment": "Run Redshift Queries",
"StartAt": "ReceiveMessage from SQS",
"States": {
"ReceiveMessage from SQS": {
...
"ResultSelector": {
"body.$": "States.StringToJson($.Messages[0].Body)"
},
...
}
},
...
}
第一个状态的输出将是:
{
"body": {
"startTime": 12828373,
"endTime": 12828374
}
}
如果您经常使用 AWS SFN 开发应用程序,您可以订阅他们的 What's New Feed。他们会在这里宣布新功能或改进,这可能会有所帮助。
参考文献:
我正在构建我的第一个简单的 AWS Step Function: 第 1 步:读取来自 SQS 的消息 第 2 步:运行 Redshift 查询
我已经成功地设置了我的初始工作流程和对资源的权限。我正在尝试格式化我的
这是我第一步的结果选择器代码:
"ResultSelector": {
"startTime.$": "$.Body.startTime",
"endTime.$": "$.Body.endTime"
}
这是错误输出:
{
"error": "States.Runtime",
"cause": "An error occurred while executing the state 'ReceiveMessage from SQS' (entered at the event id #2). The JSONPath '$.Body.startTime' specified for the field 'startTime.$' could not be found in the input '{\"Messages\":[{\"Body\":\"{\\"startTime\\": 12828373, \\"endTime\\": 12828374}\",\"Md5OfBody\":\"1ca004b811be50a7579f3a7e6affaeb1\",\"MessageId\":\"5b6b72d6-3fb7-451a-9053-26b72de32768\",\"ReceiptHandle\":\"AQEBjgttgZBNSuGZb0LFjo16Xc2+5uc48k9QPh9af4vTGy1xzb/BZZ6mBFx0FQ4ALWUoLfppWHIJbb7Zax2+Jv2dqekvLoCWWrdjasyrJnGfduXwsY20cPuW86kXz4RJfP4qbnyvWcV5Cb63u26XUE3S+3AqREo2BNwi01mI3ceYWxguXzgSgDIMg/07Lt5kBcNvB6qCIexQDMDgH91ZmmINLuX0j5gd3spHCmzBvFoQKv4PbLBS18PsC6vL1YLGxplZ+eVyzD3eoK/5AU0jqmI0l0vjn4qZCVf2iOupkwEmMe4V2AaEdxI1FJ9dSU8zPkqFhbB3na56eIXgIGwsTWs5WBvlHikTuyYjWNFn6r27qCVfwADGkENoXZG1++vRoRse9X3p5fNqqVNBL1NTBL3k3/hoxB7/A922PT0MTn7rm1I=\"}]}'"
}
问题很明显: 我仍然没有创建正确的结果选择器。
我已经 json 格式化了我在 Python 中的输入:
message = {
'startTime': 12828373,
'endTime': 12828374
}
# Send message to SQS queue
response = sqs.send_message(
QueueUrl=queue_url,
DelaySeconds=10,
MessageBody=json.dumps(message)
)
不过step函数好像是在抓取多条消息。我也没有正确提取数据。
请帮助结果选择器如何提取开始时间和结束时间。 还有什么我想念的或应该做的不同吗?
我的整个步骤函数如下:
{
"Comment": "Run Redshift Queries",
"StartAt": "ReceiveMessage from SQS",
"States": {
"ReceiveMessage from SQS": {
"Type": "Task",
"Parameters": {
"QueueUrl": "******"
},
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Next": "Run Analysis Queries",
"ResultSelector": {
"startTime.$": "$.Body.startTime",
"endTime.$": "$.Body.endTime"
}
},
"Run Analysis Queries": {
"Type": "Task",
"End": true,
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement"
}
},
"TimeoutSeconds": 3600
}
从错误信息来看,你第一个状态的输出是:
{
"Messages": [
{
"Body": "{\"startTime\": 12828373, \"endTime\": 12828374}",
"Md5OfBody": "1ca004b811be50a7579f3a7e6affaeb1",
"MessageId": "5b6b72d6-3fb7-451a-9053-26b72de32768",
"ReceiptHandle": "AQEBjgtt...."
}
]
}
所以目标在$.Messages[0].Body
下,是一个JSON字符串。您可以使用内置的内部函数来处理它:
{
"Comment": "Run Redshift Queries",
"StartAt": "ReceiveMessage from SQS",
"States": {
"ReceiveMessage from SQS": {
...
"ResultSelector": {
"body.$": "States.StringToJson($.Messages[0].Body)"
},
...
}
},
...
}
第一个状态的输出将是:
{
"body": {
"startTime": 12828373,
"endTime": 12828374
}
}
如果您经常使用 AWS SFN 开发应用程序,您可以订阅他们的 What's New Feed。他们会在这里宣布新功能或改进,这可能会有所帮助。