当前步骤失败时,AWS 步骤函数不会将下一步添加到 EMR 集群
AWS step function does not add next step to EMR cluster when current step fails
我已经从 AWS 步骤函数设置了一个状态机,它将创建一个 EMR 集群,添加几个 emr 步骤,然后终止集群。只要所有步骤 运行 完成且没有任何错误,这就可以正常工作。如果一个步骤失败,尽管添加了一个 catch 以继续下一步,但这并没有发生。每当一个步骤失败时,该步骤将被标记为已捕获(图中以华丽的颜色显示),但下一步将被标记为已取消。
如果对您有帮助,这是我的步进函数定义:
{
"StartAt": "MyEMR-SMFlowContainer-beta",
"States": {
"MyEMR-SMFlowContainer-beta": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "CreateClusterStep-feature-generation-cluster-beta",
"States": {
"CreateClusterStep-feature-generation-cluster-beta": {
"Next": "Step-SuccessfulJobOne",
"Type": "Task",
"ResultPath": "$.Cluster.1.CreateClusterTask",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Instances": {
"Ec2SubnetIds": [
"subnet-*******345fd38423"
],
"InstanceCount": 2,
"KeepJobFlowAliveWhenNoSteps": true,
"MasterInstanceType": "m4.xlarge",
"SlaveInstanceType": "m4.xlarge"
},
"JobFlowRole": "MyEMR-emrInstance-beta-EMRInstanceRole",
"Name": "emr-step-fail-handle-test-cluster",
"ServiceRole": "MyEMR-emr-beta-EMRRole",
"Applications": [
{
"Name": "Spark"
},
{
"Name": "Hadoop"
}
],
"AutoScalingRole": "MyEMR-beta-FeatureG-CreateClusterStepfeature-NJB2UG1J1EWB",
"Configurations": [
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
]
}
],
"LogUri": "s3://MyEMR-beta-feature-createclusterstepfeature-1jpp1wp3dfn04/emr/logs/",
"ReleaseLabel": "emr-5.32.0",
"VisibleToAllUsers": true
}
},
"Step-SuccessfulJobOne": {
"Next": "Step-AlwaysFailingJob",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Step-AlwaysFailingJob"
}
],
"Type": "Task",
"TimeoutSeconds": 7200,
"ResultPath": "$.ClusterStep.SuccessfulJobOne.AddSparkTask",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
"Step": {
"Name": "SuccessfulJobOne",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"--master",
"yarn",
"--conf",
"spark.logConf=true",
"--class",
"com.test.sample.core.EMRJobRunner",
"s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
"--JOB_NUMBER",
"1",
"--JOB_KEY",
"SuccessfulJobOne"
]
}
}
}
},
"Step-AlwaysFailingJob": {
"Next": "Step-SuccessfulJobTwo",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Step-SuccessfulJobTwo"
}
],
"Type": "Task",
"TimeoutSeconds": 7200,
"ResultPath": "$.ClusterStep.AlwaysFailingJob.AddSparkTask",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
"Step": {
"Name": "AlwaysFailingJob",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"--master",
"yarn",
"--conf",
"spark.logConf=true",
"--class",
"com.test.sample.core.EMRJobRunner",
"s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
"--JOB_NUMBER",
"2",
"--JOB_KEY",
"AlwaysFailingJob"
]
}
}
}
},
"Step-SuccessfulJobTwo": {
"Next": "TerminateClusterStep-feature-generation-cluster-beta",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "TerminateClusterStep-feature-generation-cluster-beta"
}
],
"Type": "Task",
"TimeoutSeconds": 7200,
"ResultPath": "$.ClusterStep.SuccessfulJobTwo.AddSparkTask",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
"Step": {
"Name": "DeviceJob",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"--master",
"yarn",
"--conf",
"spark.logConf=true",
"--class",
"com.test.sample.core.EMRJobRunner",
"s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
"--JOB_NUMBER",
"3",
"--JOB_KEY",
"SuccessfulJobTwo"
]
}
}
}
},
"TerminateClusterStep-feature-generation-cluster-beta": {
"End": true,
"Type": "Task",
"ResultPath": null,
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId"
}
}
}
}
]
}
},
"TimeoutSeconds": 43200
}
有人可以请教我如何捕捉步骤中的失败并忽略它添加下一步。
提前致谢。
问题是因为我没有在 catch 属性中指定 resultPath。这导致 resultPath 被 catch 块覆盖,因为 resultPath 的默认值为 $。下一步无法获取集群信息,因为该信息已被覆盖并因此被取消。
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Step-SuccessfulJobTwo"
}
],
一旦我更新了 catch 以获得正确的结果路径,它就按预期工作了。
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Step-SuccessfulJobTwo",
"ResultPath": "$.ClusterStep.SuccessfulJobOne.AddSparkTask.Error",
}
],
我已经从 AWS 步骤函数设置了一个状态机,它将创建一个 EMR 集群,添加几个 emr 步骤,然后终止集群。只要所有步骤 运行 完成且没有任何错误,这就可以正常工作。如果一个步骤失败,尽管添加了一个 catch 以继续下一步,但这并没有发生。每当一个步骤失败时,该步骤将被标记为已捕获(图中以华丽的颜色显示),但下一步将被标记为已取消。
如果对您有帮助,这是我的步进函数定义:
{
"StartAt": "MyEMR-SMFlowContainer-beta",
"States": {
"MyEMR-SMFlowContainer-beta": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "CreateClusterStep-feature-generation-cluster-beta",
"States": {
"CreateClusterStep-feature-generation-cluster-beta": {
"Next": "Step-SuccessfulJobOne",
"Type": "Task",
"ResultPath": "$.Cluster.1.CreateClusterTask",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Instances": {
"Ec2SubnetIds": [
"subnet-*******345fd38423"
],
"InstanceCount": 2,
"KeepJobFlowAliveWhenNoSteps": true,
"MasterInstanceType": "m4.xlarge",
"SlaveInstanceType": "m4.xlarge"
},
"JobFlowRole": "MyEMR-emrInstance-beta-EMRInstanceRole",
"Name": "emr-step-fail-handle-test-cluster",
"ServiceRole": "MyEMR-emr-beta-EMRRole",
"Applications": [
{
"Name": "Spark"
},
{
"Name": "Hadoop"
}
],
"AutoScalingRole": "MyEMR-beta-FeatureG-CreateClusterStepfeature-NJB2UG1J1EWB",
"Configurations": [
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
]
}
],
"LogUri": "s3://MyEMR-beta-feature-createclusterstepfeature-1jpp1wp3dfn04/emr/logs/",
"ReleaseLabel": "emr-5.32.0",
"VisibleToAllUsers": true
}
},
"Step-SuccessfulJobOne": {
"Next": "Step-AlwaysFailingJob",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Step-AlwaysFailingJob"
}
],
"Type": "Task",
"TimeoutSeconds": 7200,
"ResultPath": "$.ClusterStep.SuccessfulJobOne.AddSparkTask",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
"Step": {
"Name": "SuccessfulJobOne",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"--master",
"yarn",
"--conf",
"spark.logConf=true",
"--class",
"com.test.sample.core.EMRJobRunner",
"s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
"--JOB_NUMBER",
"1",
"--JOB_KEY",
"SuccessfulJobOne"
]
}
}
}
},
"Step-AlwaysFailingJob": {
"Next": "Step-SuccessfulJobTwo",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Step-SuccessfulJobTwo"
}
],
"Type": "Task",
"TimeoutSeconds": 7200,
"ResultPath": "$.ClusterStep.AlwaysFailingJob.AddSparkTask",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
"Step": {
"Name": "AlwaysFailingJob",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"--master",
"yarn",
"--conf",
"spark.logConf=true",
"--class",
"com.test.sample.core.EMRJobRunner",
"s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
"--JOB_NUMBER",
"2",
"--JOB_KEY",
"AlwaysFailingJob"
]
}
}
}
},
"Step-SuccessfulJobTwo": {
"Next": "TerminateClusterStep-feature-generation-cluster-beta",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "TerminateClusterStep-feature-generation-cluster-beta"
}
],
"Type": "Task",
"TimeoutSeconds": 7200,
"ResultPath": "$.ClusterStep.SuccessfulJobTwo.AddSparkTask",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
"Step": {
"Name": "DeviceJob",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"--master",
"yarn",
"--conf",
"spark.logConf=true",
"--class",
"com.test.sample.core.EMRJobRunner",
"s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
"--JOB_NUMBER",
"3",
"--JOB_KEY",
"SuccessfulJobTwo"
]
}
}
}
},
"TerminateClusterStep-feature-generation-cluster-beta": {
"End": true,
"Type": "Task",
"ResultPath": null,
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId"
}
}
}
}
]
}
},
"TimeoutSeconds": 43200
}
有人可以请教我如何捕捉步骤中的失败并忽略它添加下一步。 提前致谢。
问题是因为我没有在 catch 属性中指定 resultPath。这导致 resultPath 被 catch 块覆盖,因为 resultPath 的默认值为 $。下一步无法获取集群信息,因为该信息已被覆盖并因此被取消。
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Step-SuccessfulJobTwo"
}
],
一旦我更新了 catch 以获得正确的结果路径,它就按预期工作了。
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Step-SuccessfulJobTwo",
"ResultPath": "$.ClusterStep.SuccessfulJobOne.AddSparkTask.Error",
}
],