有没有办法让 Step Functions 输入值进入 EMR step Args
Is there a way to get Step Functions input values into EMR step Args
我们正在使用 AWS EMR 集群 运行ning 批处理 spark 作业。这些作业 运行 定期进行,我们希望通过 AWS Step Functions 来编排这些作业。
截至 2019 年 11 月,Step Functions 原生支持 EMR。向集群添加 Step 时,我们可以使用以下配置:
"Some Step": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.cluster.ClusterId",
"Step": {
"Name": "FirstStep",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--class",
"com.some.package.Class",
"JarUri",
"--startDate",
"$.time",
"--daysToLookBack",
"$.daysToLookBack"
]
}
}
},
"Retry" : [
{
"ErrorEquals": [ "States.ALL" ],
"IntervalSeconds": 1,
"MaxAttempts": 1,
"BackoffRate": 2.0
}
],
"ResultPath": "$.firstStep",
"End": true
}
在 HadoopJarStep 的参数列表中,我们希望动态设置参数。例如如果状态机执行的输入是:
{
"time": "2020-01-08",
"daysToLookBack": 2
}
配置中以“$”开头的字符串。执行State Machine时应该相应替换,EMR集群上的step应该运行command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate 2020-01-08 --daysToLookBack 2
。但它 运行s command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate $.time --daysToLookBack $.daysToLookBack
.
有谁知道有没有办法做到这一点?
参数允许您定义键值对,因此 "Args" 键的值是一个数组,您将无法动态引用数组中的特定元素,您需要而是引用整个数组。例如 "Args.$": "$.Input.ArgsArray".
因此,对于您的用例,实现此目的的最佳方法是在调用此状态之前添加预处理状态。在预处理状态下,您可以调用 Lambda 函数并通过代码格式化 input/output,或者对于向数组添加动态值这样简单的事情,您可以使用 Pass State to reformat the data and then inside your task State Parameters you can use JSONPath 来获取数组您在预处理器中定义。这是一个例子:
{
"Comment": "A Hello World example of the Amazon States Language using Pass states",
"StartAt": "HardCodedInputs",
"States": {
"HardCodedInputs": {
"Type": "Pass",
"Parameters": {
"cluster": {
"ClusterId": "ValueForClusterIdVariable"
},
"time": "ValueForTimeVariable",
"daysToLookBack": "ValueFordaysToLookBackVariable"
},
"Next": "Pre-Process"
},
"Pre-Process": {
"Type": "Pass",
"Parameters": {
"FormattedInputsForEmr": {
"ClusterId.$": "$.cluster.ClusterId",
"Args": [
{
"Arg1": "spark-submit"
},
{
"Arg2": "--class"
},
{
"Arg3": "com.some.package.Class"
},
{
"Arg4": "JarUri"
},
{
"Arg5": "--startDate"
},
{
"Arg6.$": "$.time"
},
{
"Arg7": "--daysToLookBack"
},
{
"Arg8.$": "$.daysToLookBack"
}
]
}
},
"Next": "Some Step"
},
"Some Step": {
"Type": "Pass",
"Parameters": {
"ClusterId.$": "$.FormattedInputsForEmr.ClusterId",
"Step": {
"Name": "FirstStep",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args.$": "$.FormattedInputsForEmr.Args[*][*]"
}
}
},
"End": true
}
}
}
您可以使用 States.Array()
内部函数。您的 Parameters
变为:
"Parameters": {
"ClusterId.$": "$.cluster.ClusterId",
"Step": {
"Name": "FirstStep",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args.$": "States.Array('spark-submit', '--class', 'com.some.package.Class', 'JarUri', '--startDate', $.time, '--daysToLookBack', '$.daysToLookBack')"
}
}
}
内部函数已记录 here,但我认为它没有很好地解释用法。 Step Functions 控制台中提供的代码片段更有用。
请注意,您还可以使用 States.Format()
对 args 进行字符串格式化。例如,您可以使用输入变量作为最终路径段来构建路径:
"Args.$": "States.Array('mycommand', '--path', States.Format('my/base/path/{}', $.someInputVariable))"
我们正在使用 AWS EMR 集群 运行ning 批处理 spark 作业。这些作业 运行 定期进行,我们希望通过 AWS Step Functions 来编排这些作业。
截至 2019 年 11 月,Step Functions 原生支持 EMR。向集群添加 Step 时,我们可以使用以下配置:
"Some Step": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.cluster.ClusterId",
"Step": {
"Name": "FirstStep",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--class",
"com.some.package.Class",
"JarUri",
"--startDate",
"$.time",
"--daysToLookBack",
"$.daysToLookBack"
]
}
}
},
"Retry" : [
{
"ErrorEquals": [ "States.ALL" ],
"IntervalSeconds": 1,
"MaxAttempts": 1,
"BackoffRate": 2.0
}
],
"ResultPath": "$.firstStep",
"End": true
}
在 HadoopJarStep 的参数列表中,我们希望动态设置参数。例如如果状态机执行的输入是:
{
"time": "2020-01-08",
"daysToLookBack": 2
}
配置中以“$”开头的字符串。执行State Machine时应该相应替换,EMR集群上的step应该运行command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate 2020-01-08 --daysToLookBack 2
。但它 运行s command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate $.time --daysToLookBack $.daysToLookBack
.
有谁知道有没有办法做到这一点?
参数允许您定义键值对,因此 "Args" 键的值是一个数组,您将无法动态引用数组中的特定元素,您需要而是引用整个数组。例如 "Args.$": "$.Input.ArgsArray".
因此,对于您的用例,实现此目的的最佳方法是在调用此状态之前添加预处理状态。在预处理状态下,您可以调用 Lambda 函数并通过代码格式化 input/output,或者对于向数组添加动态值这样简单的事情,您可以使用 Pass State to reformat the data and then inside your task State Parameters you can use JSONPath 来获取数组您在预处理器中定义。这是一个例子:
{
"Comment": "A Hello World example of the Amazon States Language using Pass states",
"StartAt": "HardCodedInputs",
"States": {
"HardCodedInputs": {
"Type": "Pass",
"Parameters": {
"cluster": {
"ClusterId": "ValueForClusterIdVariable"
},
"time": "ValueForTimeVariable",
"daysToLookBack": "ValueFordaysToLookBackVariable"
},
"Next": "Pre-Process"
},
"Pre-Process": {
"Type": "Pass",
"Parameters": {
"FormattedInputsForEmr": {
"ClusterId.$": "$.cluster.ClusterId",
"Args": [
{
"Arg1": "spark-submit"
},
{
"Arg2": "--class"
},
{
"Arg3": "com.some.package.Class"
},
{
"Arg4": "JarUri"
},
{
"Arg5": "--startDate"
},
{
"Arg6.$": "$.time"
},
{
"Arg7": "--daysToLookBack"
},
{
"Arg8.$": "$.daysToLookBack"
}
]
}
},
"Next": "Some Step"
},
"Some Step": {
"Type": "Pass",
"Parameters": {
"ClusterId.$": "$.FormattedInputsForEmr.ClusterId",
"Step": {
"Name": "FirstStep",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args.$": "$.FormattedInputsForEmr.Args[*][*]"
}
}
},
"End": true
}
}
}
您可以使用 States.Array()
内部函数。您的 Parameters
变为:
"Parameters": {
"ClusterId.$": "$.cluster.ClusterId",
"Step": {
"Name": "FirstStep",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args.$": "States.Array('spark-submit', '--class', 'com.some.package.Class', 'JarUri', '--startDate', $.time, '--daysToLookBack', '$.daysToLookBack')"
}
}
}
内部函数已记录 here,但我认为它没有很好地解释用法。 Step Functions 控制台中提供的代码片段更有用。
请注意,您还可以使用 States.Format()
对 args 进行字符串格式化。例如,您可以使用输入变量作为最终路径段来构建路径:
"Args.$": "States.Array('mycommand', '--path', States.Format('my/base/path/{}', $.someInputVariable))"