在 AWS Step Functions 中传递变量
Pass Variables in AWS Step Functions
我对 AWS Step Functions 和 AWS Lambda Functions 还很陌生,确实需要一些帮助。我有一个状态机,我试图检查我的 S3 存储桶中是否存在某个文件,然后如果文件存在则让状态机遵循一条路径,如果不存在则遵循不同的路径。
下面显示了涵盖此问题的我的状态机代码的开头
{
"Comment": "This is a test for running the structure of the CustomCreate job.",
"StartAt": "PreStep",
"States": {
"PreStep": {
"Comment": "Check that all the necessary files exist before running the job.",
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:XXXXXXXXXX:function:CustomCreate-PreStep-Function",
"Next": "Run Job Choice"
},
"Run Job Choice": {
"Comment": "This step chooses whether or not to go forward with running the main job.",
"Type": "Choice",
"Choices": [
{
"Variable": "$.FoundNecessaryFiles",
"BooleanEquals": true,
"Next": "Spin Up Cluster"
},
{
"Variable": "$.FoundNecessaryFiles",
"BooleanEquals": false,
"Next": "Do Not Run Job"
}
]
},
"Do Not Run Job": {
"Comment": "This step triggers if the PreStep fails and the job should not run.",
"Type": "Fail",
"Cause": "PreStep unsuccessful"
},
"Spin Up Cluster": {
"Comment": "Spins up the EMR Cluster.",
"Type": "Pass",
"Next": "Update Env"
},
"Update Env": {
"Comment": "Update the environment variables in the EMR Cluster.",
"Type": "Pass",
"Next": "Run Job"
},
"Run Job": {
"Comment": "Add steps to the EMR Cluster.",
"Type": "Pass",
"End": true
}
}
}
以下代码描述了我的 CustomCreate-PreStep-Function Lambda 函数
exports.handler = async function(event, context, callback) {
var AWS = require('aws-sdk');
var s3 = new AWS.S3();
var params = {Bucket: 'BUCKET_NAME', Key: 'FILE_NAME'};
s3.getObject(params, function(err, data) {
if (err) {
console.log(err, err.stack);
// file does not exist
console.log("failed");
callback(null,false);
}
else {
console.log(data);
//file exist
console.log("succeeded");
var FoundNecessaryFiles = true;
// return FoundNecessaryFiles;
callback(null,event.FoundNecessaryFiles=true);
}
});
};
我已经尝试过多种方法,但一直无法正常工作。如您所见,我正在尝试使用 Lambda 函数传回状态为 true/false 的变量 FoundNecessaryFiles,具体取决于是否找到文件并使用该值来指导下一步的选择。最好通过修复变量回传来解决这个问题,因为稍后我可能需要在状态机中再次使用该方法,但我也愿意接受另一种解决方案,无论是组合步骤还是其他可能有效的解决方案.
另外,我在此过程中的下一步将是启动一个 AWS EMR 集群,前提是存在适当的文件,这我也不清楚如何完成。如果有人能够使用 Step Functions 在 运行 AWS EMR 集群中提供任何帮助,我将不胜感激。
我解决了传递变量的最初问题,但是,我仍然可以真正使用一些帮助通过 Step Functions 运行获得 EMR 集群。
对于那些可能遇到类似问题的人,我通过将我的 Lambda 函数更改为以下内容来解决我的变量传递问题
exports.handler = function(event, context, callback) {
var AWS = require('aws-sdk');
var s3 = new AWS.S3();
var params = {Bucket: 'BUCKET_NAME', Key: 'FILE_NAME'};
s3.getObject(params, function(err, data) {
if (err) {
console.log(err, err.stack);
// file does not exist
console.log("failed");
event.FoundNecessaryFiles = false;
callback(null,event);
}
else {
console.log(data);
//file exist
console.log("succeeded");
event.FoundNecessaryFiles = true;
callback(null,event);
}
});
};
我的下一个问题是设置 AWS EMR 集群。我目前的第一个任务是启动一个 EMR 集群,这可以通过直接使用 Step Function JSON,或者最好使用我在 S3 存储桶上找到的 JSON 集群配置文件来完成。我的下一个任务是更新 EMR 集群环境变量,我的 S3 存储桶上有一个 .sh 脚本可以执行此操作,我只是不知道如何使用 Step Functions 将其应用于 EMR 集群。我的第三个任务是向 EMR 集群添加一个包含 spark-submit 命令的步骤,该命令在我的 S3 存储桶上的 JSON 配置文件中进行了描述,可以以类似的方式上传到 EMR 集群在上一步中上传环境配置文件。最后,我想要一个确保 EMR 集群在完成 运行.
后终止的任务
如果您对此提供任何帮助,我们将不胜感激,无论您是遵循我上面概述的结构还是知道改变结构的解决方案,我都很乐意提供任何帮助。
这可能对某些人有所帮助,因为 AWS 步函数中引入了新功能。
为了在 EMR 集群上设置变量或部署软件,应该使用 BootStrap 操作。您可以提供 bootstrap 脚本的 S3 路径。 Bootstrap 脚本作为集群创建过程的一部分运行。
可以使用 Step Functions 添加 EMR 步骤(同步或异步)。可以将 spark-submit 构建为 Args 数组。这是一个例子:
ARGS_ARRAY = [ "spark-submit",
"--master=yarn",
"--deploy-mode=cluster",
"--name=Generate Bulk File",
"--driver-cores=1",
"--driver-memory=2g",
"--executor-cores=5",
"--executor-memory=9g",
"/home/hadoop/charu/mySparkJob.py",
input_file_name,
output_file_name]
- 可以根据之前的 EMR 步骤将工作流中的最后一步添加为 'Terminate EMR Cluster',即如果之前的 EMR 步骤是同步的并成功完成,则只会终止集群。 (假设这就是一个人想要的)
我对 AWS Step Functions 和 AWS Lambda Functions 还很陌生,确实需要一些帮助。我有一个状态机,我试图检查我的 S3 存储桶中是否存在某个文件,然后如果文件存在则让状态机遵循一条路径,如果不存在则遵循不同的路径。
下面显示了涵盖此问题的我的状态机代码的开头
{
"Comment": "This is a test for running the structure of the CustomCreate job.",
"StartAt": "PreStep",
"States": {
"PreStep": {
"Comment": "Check that all the necessary files exist before running the job.",
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:XXXXXXXXXX:function:CustomCreate-PreStep-Function",
"Next": "Run Job Choice"
},
"Run Job Choice": {
"Comment": "This step chooses whether or not to go forward with running the main job.",
"Type": "Choice",
"Choices": [
{
"Variable": "$.FoundNecessaryFiles",
"BooleanEquals": true,
"Next": "Spin Up Cluster"
},
{
"Variable": "$.FoundNecessaryFiles",
"BooleanEquals": false,
"Next": "Do Not Run Job"
}
]
},
"Do Not Run Job": {
"Comment": "This step triggers if the PreStep fails and the job should not run.",
"Type": "Fail",
"Cause": "PreStep unsuccessful"
},
"Spin Up Cluster": {
"Comment": "Spins up the EMR Cluster.",
"Type": "Pass",
"Next": "Update Env"
},
"Update Env": {
"Comment": "Update the environment variables in the EMR Cluster.",
"Type": "Pass",
"Next": "Run Job"
},
"Run Job": {
"Comment": "Add steps to the EMR Cluster.",
"Type": "Pass",
"End": true
}
}
}
以下代码描述了我的 CustomCreate-PreStep-Function Lambda 函数
exports.handler = async function(event, context, callback) {
var AWS = require('aws-sdk');
var s3 = new AWS.S3();
var params = {Bucket: 'BUCKET_NAME', Key: 'FILE_NAME'};
s3.getObject(params, function(err, data) {
if (err) {
console.log(err, err.stack);
// file does not exist
console.log("failed");
callback(null,false);
}
else {
console.log(data);
//file exist
console.log("succeeded");
var FoundNecessaryFiles = true;
// return FoundNecessaryFiles;
callback(null,event.FoundNecessaryFiles=true);
}
});
};
我已经尝试过多种方法,但一直无法正常工作。如您所见,我正在尝试使用 Lambda 函数传回状态为 true/false 的变量 FoundNecessaryFiles,具体取决于是否找到文件并使用该值来指导下一步的选择。最好通过修复变量回传来解决这个问题,因为稍后我可能需要在状态机中再次使用该方法,但我也愿意接受另一种解决方案,无论是组合步骤还是其他可能有效的解决方案.
另外,我在此过程中的下一步将是启动一个 AWS EMR 集群,前提是存在适当的文件,这我也不清楚如何完成。如果有人能够使用 Step Functions 在 运行 AWS EMR 集群中提供任何帮助,我将不胜感激。
我解决了传递变量的最初问题,但是,我仍然可以真正使用一些帮助通过 Step Functions 运行获得 EMR 集群。
对于那些可能遇到类似问题的人,我通过将我的 Lambda 函数更改为以下内容来解决我的变量传递问题
exports.handler = function(event, context, callback) {
var AWS = require('aws-sdk');
var s3 = new AWS.S3();
var params = {Bucket: 'BUCKET_NAME', Key: 'FILE_NAME'};
s3.getObject(params, function(err, data) {
if (err) {
console.log(err, err.stack);
// file does not exist
console.log("failed");
event.FoundNecessaryFiles = false;
callback(null,event);
}
else {
console.log(data);
//file exist
console.log("succeeded");
event.FoundNecessaryFiles = true;
callback(null,event);
}
});
};
我的下一个问题是设置 AWS EMR 集群。我目前的第一个任务是启动一个 EMR 集群,这可以通过直接使用 Step Function JSON,或者最好使用我在 S3 存储桶上找到的 JSON 集群配置文件来完成。我的下一个任务是更新 EMR 集群环境变量,我的 S3 存储桶上有一个 .sh 脚本可以执行此操作,我只是不知道如何使用 Step Functions 将其应用于 EMR 集群。我的第三个任务是向 EMR 集群添加一个包含 spark-submit 命令的步骤,该命令在我的 S3 存储桶上的 JSON 配置文件中进行了描述,可以以类似的方式上传到 EMR 集群在上一步中上传环境配置文件。最后,我想要一个确保 EMR 集群在完成 运行.
后终止的任务如果您对此提供任何帮助,我们将不胜感激,无论您是遵循我上面概述的结构还是知道改变结构的解决方案,我都很乐意提供任何帮助。
这可能对某些人有所帮助,因为 AWS 步函数中引入了新功能。
为了在 EMR 集群上设置变量或部署软件,应该使用 BootStrap 操作。您可以提供 bootstrap 脚本的 S3 路径。 Bootstrap 脚本作为集群创建过程的一部分运行。
可以使用 Step Functions 添加 EMR 步骤(同步或异步)。可以将 spark-submit 构建为 Args 数组。这是一个例子:
ARGS_ARRAY = [ "spark-submit",
"--master=yarn",
"--deploy-mode=cluster",
"--name=Generate Bulk File",
"--driver-cores=1",
"--driver-memory=2g",
"--executor-cores=5",
"--executor-memory=9g",
"/home/hadoop/charu/mySparkJob.py",
input_file_name,
output_file_name]
- 可以根据之前的 EMR 步骤将工作流中的最后一步添加为 'Terminate EMR Cluster',即如果之前的 EMR 步骤是同步的并成功完成,则只会终止集群。 (假设这就是一个人想要的)