AWS Step 函数和 Athena:我可以从输入路径配置查询字符串吗?

AWS Step function And Athena : can I configure query string from inputpath?

在我的步骤函数中,我想执行一个 Athena 查询。我能够定义一个步骤并成功执行查询。但是,我想将一些参数作为输入传递并在查询字符串中使用它们。例如。

比方说,我的查询字符串是:

select * from <Data Source>.<database>.<tablename> where partition_0 = '2021';

我希望能够将年份作为输入 json 传递给步进函数,例如:

{
"YYYY": 2021
}

是否可以在查询字符串中插入输入“YYYY”?如果是,怎么做?

示例步骤函数配置:

{
  "Comment": "Start athena exececution",
  "StartAt": "athena",
  "States": {
    "athena": {
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "select * from mycatalog.mydatabase.mytable where partition_0 = '2021'",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://mys3bucket"
        }
      },
      "Next": "Pass"
    },
    "Pass": {
      "Comment": "A Pass state passes its input to its output, without performing work. Pass states are useful when constructing and debugging state machines.",
      "Type": "Pass",
      "End": true
    }
  }
}

使用 Pass State 将输入 YYYY 插入查询字符串:

"QueryPassTask": {
  "Type": "Pass",
  "ResultPath": "$.athena",
  "Parameters": {
    "query.$": "States.Format('select * from mycatalog.mydatabase.mytable where partition_0 = \'{}\'', $.YYYY)"
  },

Pass任务输出为:

{
  "YYYY": 2021,
  "athena": {
    "query": "select * from mycatalog.mydatabase.mytable where partition_0 = '2021'"
  }
}

接下来,向 Athena 任务提供查询字符串。不要忘记密钥上的“.$”后缀。这告诉 Step Functions 键的值包含替换。

 "Parameters": {
        "QueryString.$": "$.athena.query",

除了 fedenov 的回答:我认为您不一定需要中间 Pass 状态。 Pass 状态也不允许您使用 ResultSelector (这可能很烦人)。您可以直接在 Athena 查询任务中进行字符串格式化。

下面是一个例子。

{
    "athena": {
        "catalog": "my_catalog",
        "database": "my_database",
        "outputLocation": "S3://my_bucket...",
        "queryString": "SELECT ... {}",
        "year": "2022"
    }
}
    "queryData": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryExecutionContext": {
          "Catalog.$": "$.athena.catalog",
          "Database.$": "$.athena.database"
        },
        "ResultConfiguration": {
          "OutputLocation.$": "$.athena.outputLocation"
        },
        "QueryString.$": "States.Format($.athena.queryString, $.athena.year)",
        "WorkGroup": "primary"
      },
      "ResultPath": "$.query",
      "Next": "nextStep"
    },