在 AWS Step Functions 中传递变量

Pass Variables in AWS Step Functions

我对 AWS Step Functions 和 AWS Lambda Functions 还很陌生,确实需要一些帮助。我有一个状态机,我试图检查我的 S3 存储桶中是否存在某个文件,然后如果文件存在则让状态机遵循一条路径,如果不存在则遵循不同的路径。

下面显示了涵盖此问题的我的状态机代码的开头

{
  "Comment": "This is a test for running the structure of the CustomCreate job.",
  "StartAt": "PreStep",
  "States": {
    "PreStep": {
      "Comment": "Check that all the necessary files exist before running the job.",
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:XXXXXXXXXX:function:CustomCreate-PreStep-Function",
      "Next": "Run Job Choice"
    },
    "Run Job Choice": {
      "Comment": "This step chooses whether or not to go forward with running the main job.",
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.FoundNecessaryFiles",
          "BooleanEquals": true,
          "Next": "Spin Up Cluster"
        },
        {
          "Variable": "$.FoundNecessaryFiles",
          "BooleanEquals": false,
          "Next": "Do Not Run Job"
        }
      ]
    },
    "Do Not Run Job": {
      "Comment": "This step triggers if the PreStep fails and the job should not run.",
      "Type": "Fail",
      "Cause": "PreStep unsuccessful"
    },
    "Spin Up Cluster": {
      "Comment": "Spins up the EMR Cluster.",
      "Type": "Pass",
      "Next": "Update Env"
    },
    "Update Env": {
      "Comment": "Update the environment variables in the EMR Cluster.",
      "Type": "Pass",
      "Next": "Run Job"
    },
    "Run Job": {
      "Comment": "Add steps to the EMR Cluster.",
      "Type": "Pass",
      "End": true
    }
  }
}

以下代码描述了我的 CustomCreate-PreStep-Function Lambda 函数

exports.handler = async function(event, context, callback) {
     var AWS = require('aws-sdk');
     var s3 = new AWS.S3();
     var params = {Bucket: 'BUCKET_NAME', Key: 'FILE_NAME'};
     s3.getObject(params, function(err, data) {

        if (err) {
            console.log(err, err.stack);
            // file does not exist
            console.log("failed");
            callback(null,false);
        }
        else {
            console.log(data);
            //file exist
            console.log("succeeded");
            var FoundNecessaryFiles = true;
            // return FoundNecessaryFiles;
            callback(null,event.FoundNecessaryFiles=true);
        }    
    });
};

我已经尝试过多种方法,但一直无法正常工作。如您所见,我正在尝试使用 Lambda 函数传回状态为 true/false 的变量 FoundNecessaryFiles,具体取决于是否找到文件并使用该值来指导下一步的选择。最好通过修复变量回传来解决这个问题,因为稍后我可能需要在状态机中再次使用该方法,但我也愿意接受另一种解决方案,无论是组合步骤还是其他可能有效的解决方案.

另外,我在此过程中的下一步将是启动一个 AWS EMR 集群,前提是存在适当的文件,这我也不清楚如何完成。如果有人能够使用 Step Functions 在 运行 AWS EMR 集群中提供任何帮助,我将不胜感激。

我解决了传递变量的最初问题,但是,我仍然可以真正使用一些帮助通过 Step Functions 运行获得 EMR 集群。

对于那些可能遇到类似问题的人,我通过将我的 Lambda 函数更改为以下内容来解决我的变量传递问题

exports.handler = function(event, context, callback) {
     var AWS = require('aws-sdk');
     var s3 = new AWS.S3();
     var params = {Bucket: 'BUCKET_NAME', Key: 'FILE_NAME'};
     s3.getObject(params, function(err, data) {

        if (err) {
            console.log(err, err.stack);
            // file does not exist
            console.log("failed");
            event.FoundNecessaryFiles = false;
            callback(null,event);
        }
        else {
            console.log(data);
            //file exist
            console.log("succeeded");
            event.FoundNecessaryFiles = true;
            callback(null,event);
        }    
    });
};

我的下一个问题是设置 AWS EMR 集群。我目前的第一个任务是启动一个 EMR 集群,这可以通过直接使用 Step Function JSON,或者最好使用我在 S3 存储桶上找到的 JSON 集群配置文件来完成。我的下一个任务是更新 EMR 集群环境变量,我的 S3 存储桶上有一个 .sh 脚本可以执行此操作,我只是不知道如何使用 Step Functions 将其应用于 EMR 集群。我的第三个任务是向 EMR 集群添加一个包含 spark-submit 命令的步骤,该命令在我的 S3 存储桶上的 JSON 配置文件中进行了描述,可以以类似的方式上传到 EMR 集群在上一步中上传环境配置文件。最后,我想要一个确保 EMR 集群在完成 运行.

后终止的任务

如果您对此提供任何帮助,我们将不胜感激,无论您是遵循我上面概述的结构还是知道改变结构的解决方案,我都很乐意提供任何帮助。

这可能对某些人有所帮助,因为 AWS 步函数中引入了新功能。

  1. 为了在 EMR 集群上设置变量或部署软件,应该使用 BootStrap 操作。您可以提供 bootstrap 脚本的 S3 路径。 Bootstrap 脚本作为集群创建过程的一部分运行。

  2. 可以使用 Step Functions 添加 EMR 步骤(同步或异步)。可以将 spark-submit 构建为 Args 数组。这是一个例子:

ARGS_ARRAY = [ "spark-submit",
              "--master=yarn",
              "--deploy-mode=cluster",
              "--name=Generate Bulk File",
              "--driver-cores=1",
              "--driver-memory=2g",
              "--executor-cores=5",
              "--executor-memory=9g",
              "/home/hadoop/charu/mySparkJob.py", 
              input_file_name,
              output_file_name] 

  1. 可以根据之前的 EMR 步骤将工作流中的最后一步添加为 'Terminate EMR Cluster',即如果之前的 EMR 步骤是同步的并成功完成,则只会终止集群。 (假设这就是一个人想要的)