当前步骤失败时,AWS 步骤函数不会将下一步添加到 EMR 集群

AWS step function does not add next step to EMR cluster when current step fails

我已经从 AWS 步骤函数设置了一个状态机,它将创建一个 EMR 集群,添加几个 emr 步骤,然后终止集群。只要所有步骤 运行 完成且没有任何错误,这就可以正常工作。如果一个步骤失败,尽管添加了一个 catch 以继续下一步,但这并没有发生。每当一个步骤失败时,该步骤将被标记为已捕获(图中以华丽的颜色显示),但下一步将被标记为已取消。

如果对您有帮助,这是我的步进函数定义:

{
  "StartAt": "MyEMR-SMFlowContainer-beta",
  "States": {
    "MyEMR-SMFlowContainer-beta": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
          "StartAt": "CreateClusterStep-feature-generation-cluster-beta",
          "States": {
            "CreateClusterStep-feature-generation-cluster-beta": {
              "Next": "Step-SuccessfulJobOne",
              "Type": "Task",
              "ResultPath": "$.Cluster.1.CreateClusterTask",
              "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
              "Parameters": {
                "Instances": {
                  "Ec2SubnetIds": [
                    "subnet-*******345fd38423"
                  ],
                  "InstanceCount": 2,
                  "KeepJobFlowAliveWhenNoSteps": true,
                  "MasterInstanceType": "m4.xlarge",
                  "SlaveInstanceType": "m4.xlarge"
                },
                "JobFlowRole": "MyEMR-emrInstance-beta-EMRInstanceRole",
                "Name": "emr-step-fail-handle-test-cluster",
                "ServiceRole": "MyEMR-emr-beta-EMRRole",
                "Applications": [
                  {
                    "Name": "Spark"
                  },
                  {
                    "Name": "Hadoop"
                  }
                ],
                "AutoScalingRole": "MyEMR-beta-FeatureG-CreateClusterStepfeature-NJB2UG1J1EWB",
                "Configurations": [
                  {
                    "Classification": "spark-env",
                    "Configurations": [
                      {
                        "Classification": "export",
                        "Properties": {
                          "PYSPARK_PYTHON": "/usr/bin/python3"
                        }
                      }
                    ]
                  }
                ],
                "LogUri": "s3://MyEMR-beta-feature-createclusterstepfeature-1jpp1wp3dfn04/emr/logs/",
                "ReleaseLabel": "emr-5.32.0",
                "VisibleToAllUsers": true
              }
            },
            "Step-SuccessfulJobOne": {
              "Next": "Step-AlwaysFailingJob",
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "Step-AlwaysFailingJob"
                }
              ],
              "Type": "Task",
              "TimeoutSeconds": 7200,
              "ResultPath": "$.ClusterStep.SuccessfulJobOne.AddSparkTask",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
              "Parameters": {
                "ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
                "Step": {
                  "Name": "SuccessfulJobOne",
                  "ActionOnFailure": "CONTINUE",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "spark-submit",
                      "--deploy-mode",
                      "client",
                      "--master",
                      "yarn",
                      "--conf",
                      "spark.logConf=true",
                      "--class",
                      "com.test.sample.core.EMRJobRunner",
                      "s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
                      "--JOB_NUMBER",
                      "1",
                      "--JOB_KEY",
                      "SuccessfulJobOne"
                    ]
                  }
                }
              }
            },
            "Step-AlwaysFailingJob": {
              "Next": "Step-SuccessfulJobTwo",
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "Step-SuccessfulJobTwo"
                }
              ],
              "Type": "Task",
              "TimeoutSeconds": 7200,
              "ResultPath": "$.ClusterStep.AlwaysFailingJob.AddSparkTask",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
              "Parameters": {
                "ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
                "Step": {
                  "Name": "AlwaysFailingJob",
                  "ActionOnFailure": "CONTINUE",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "spark-submit",
                      "--deploy-mode",
                      "client",
                      "--master",
                      "yarn",
                      "--conf",
                      "spark.logConf=true",
                      "--class",
                      "com.test.sample.core.EMRJobRunner",
                      "s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
                      "--JOB_NUMBER",
                      "2",
                      "--JOB_KEY",
                      "AlwaysFailingJob"
                    ]
                  }
                }
              }
            },
            "Step-SuccessfulJobTwo": {
              "Next": "TerminateClusterStep-feature-generation-cluster-beta",
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "Next": "TerminateClusterStep-feature-generation-cluster-beta"
                }
              ],
              "Type": "Task",
              "TimeoutSeconds": 7200,
              "ResultPath": "$.ClusterStep.SuccessfulJobTwo.AddSparkTask",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
              "Parameters": {
                "ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId",
                "Step": {
                  "Name": "DeviceJob",
                  "ActionOnFailure": "CONTINUE",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "spark-submit",
                      "--deploy-mode",
                      "client",
                      "--master",
                      "yarn",
                      "--conf",
                      "spark.logConf=true",
                      "--class",
                      "com.test.sample.core.EMRJobRunner",
                      "s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar",
                      "--JOB_NUMBER",
                      "3",
                      "--JOB_KEY",
                      "SuccessfulJobTwo"
                    ]
                  }
                }
              }
            },
            "TerminateClusterStep-feature-generation-cluster-beta": {
              "End": true,
              "Type": "Task",
              "ResultPath": null,
              "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
              "Parameters": {
                "ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId"
              }
            }
          }
        }
      ]
    }
  },
  "TimeoutSeconds": 43200
}

有人可以请教我如何捕捉步骤中的失败并忽略它添加下一步。 提前致谢。

问题是因为我没有在 catch 属性中指定 resultPath。这导致 resultPath 被 catch 块覆盖,因为 resultPath 的默认值为 $。下一步无法获取集群信息,因为该信息已被覆盖并因此被取消。

      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Step-SuccessfulJobTwo"
        }
      ],

一旦我更新了 catch 以获得正确的结果路径,它就按预期工作了。

      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Step-SuccessfulJobTwo",
          "ResultPath": "$.ClusterStep.SuccessfulJobOne.AddSparkTask.Error",
        }
      ],