是否可以在没有 Lambda 的情况下通过 AWS STEP FUNCTION 为 AWS EMR 执行 Step Concurrency?
Is it possible to execute Step Concurrency for AWS EMR through AWS STEP FUNCTION without Lambda?
这是我的场景,我正在尝试创建 4 个 AWS EMR 集群,其中每个集群将分配 2 个作业,因此它就像 4 个集群,其中 8 个作业使用 Step Function 编排。
我的流程应该是这样的:
4 个集群将同时启动 运行8 个并行作业,其中每个集群将并行 运行 2 个作业。
现在,最近 AWS 在 EMR 中使用 StepConcurrencyLevel 向 运行 2(或)多个作业同时启动了此功能,以减少 运行集群时间,可以使用 EMR 控制台、AWS CLI(或)甚至通过 AWS lambda 执行。
但是,我想使用 AWS Step Function 及其状态机语言(如此处引用的格式)在单个集群中并行启动 2 个(或)多个作业的过程 https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html
我已经尝试参考许多网站来执行这个过程,在那里我通过控制台(或)通过 AWS lambda 中的 boto3 格式获得解决方案,但我找不到执行这个的解决方案通过 Step Function 本身...
有解决办法吗!?
提前致谢..
所以,我浏览了几个网站并找到了解决我的问题的方法...
我遇到的问题是 StepConcurrencyLevel,我可以在其中使用 AWS 控制台(或)通过 AWS CLI(或)甚至通过 Python 使用 BOTO3 添加它...但我期待使用状态机的解决方案语言和我找到了一个...
我们所要做的就是在使用状态机语言创建我们的集群时,我们必须在其中指定 StepConcurrencyLevel,如 2(或)3,默认值为 1。一旦设置好,然后在下面创建 4 个步骤该集群和 运行 状态机。
其中集群将识别已设置的并发数,并将 运行 相应的步骤。
我的示例过程:
-> JSON 我编排的脚本
{
"StartAt": "Create_A_Cluster",
"States": {
"Create_A_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "WorkflowCluster",
"StepConcurrencyLevel": 2,
"Tags": [
{
"Key": "Description",
"Value": "process"
},
{
"Key": "Name",
"Value": "filename"
},
{
"Key": "Owner",
"Value": "owner"
},
{
"Key": "Project",
"Value": "roject"
},
{
"Key": "User",
"Value": "user"
}
],
"VisibleToAllUsers": true,
"ReleaseLabel": "emr-5.28.1",
"Applications": [
{
"Name": "Spark"
}
],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3://prefix/prefix/log.txt/",
"Instances": {
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 90
}
]
},
{
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 90
}
]
}
]
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fail_Cluster"
}
],
"ResultPath": "$.cluster",
"OutputPath": "$.cluster",
"Next": "Add_Steps_Parallel"
},
"Fail_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.Cause"
},
"Next": "Terminate_Cluster"
},
"Add_Steps_Parallel": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Step_One",
"States": {
"Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"Step": {
"Name": "The first step",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"maximizeResourceAllocation=true",
"--conf",
"spark.shuffle.service.enabled=true",
"--py-files",
"s3://prefix/prefix/pythonfile.py",
"s3://prefix/prefix/pythonfile.py"
]
}
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.err_mgs",
"Next": "Fail_SNS"
}
],
"ResultPath": "$.step1",
"Next": "Terminate_Cluster_1"
},
"Fail_SNS": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.err_mgs.Cause"
},
"ResultPath": "$.fail_cluster",
"Next": "Terminate_Cluster_1"
},
"Terminate_Cluster_1": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
},
{
"StartAt": "Step_Two",
"States": {
"Step_Two": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"Step": {
"Name": "The second step",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"maximizeResourceAllocation=true",
"--conf",
"spark.shuffle.service.enabled=true",
"--py-files",
"s3://prefix/prefix/pythonfile.py",
"s3://prefix/prefix/pythonfile.py"
]
}
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.err_mgs_1",
"Next": "Fail_SNS_1"
}
],
"ResultPath": "$.step2",
"Next": "Terminate_Cluster_2"
},
"Fail_SNS_1": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.err_mgs_1.Cause"
},
"ResultPath": "$.fail_cluster_1",
"Next": "Terminate_Cluster_2"
},
"Terminate_Cluster_2": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
}
],
"ResultPath": "$.steps",
"Next": "Terminate_Cluster"
},
"Terminate_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
}
在此脚本(或)AWS Step Function 的状态机语言中,在创建集群时我提到了 StepConcurrencyLevel 为 2 并添加了 2 个 spark 作业作为集群下方的步骤。
当我在 Step Function 中 运行 这个脚本时,我能够编排集群并在集群中同时执行 运行 2 个步骤,而无需在 AWS EMR 控制台中直接配置它(或)通过 AWS CLI(或)甚至通过 BOTO3。
我只是使用状态机语言在 AWS Step Function 下的单个集群中同时执行 运行ning 2 个步骤的编排,而没有任何其他服务(如 lambda 或 livy)的帮助 API 或BOTO3等...
流程图如下:
AWS Step Function Workflow for concurrent step execution
为了更准确地说明我在上面的状态机语言中插入 StepConcurrencyLevel 的位置:
"Create_A_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "WorkflowCluster",
"StepConcurrencyLevel": 2,
"Tags": [
{
"Key": "Description",
"Value": "process"
},
下Create_A_Cluster.
谢谢。
这是我的场景,我正在尝试创建 4 个 AWS EMR 集群,其中每个集群将分配 2 个作业,因此它就像 4 个集群,其中 8 个作业使用 Step Function 编排。
我的流程应该是这样的:
4 个集群将同时启动 运行8 个并行作业,其中每个集群将并行 运行 2 个作业。
现在,最近 AWS 在 EMR 中使用 StepConcurrencyLevel 向 运行 2(或)多个作业同时启动了此功能,以减少 运行集群时间,可以使用 EMR 控制台、AWS CLI(或)甚至通过 AWS lambda 执行。
但是,我想使用 AWS Step Function 及其状态机语言(如此处引用的格式)在单个集群中并行启动 2 个(或)多个作业的过程 https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html
我已经尝试参考许多网站来执行这个过程,在那里我通过控制台(或)通过 AWS lambda 中的 boto3 格式获得解决方案,但我找不到执行这个的解决方案通过 Step Function 本身...
有解决办法吗!?
提前致谢..
所以,我浏览了几个网站并找到了解决我的问题的方法...
我遇到的问题是 StepConcurrencyLevel,我可以在其中使用 AWS 控制台(或)通过 AWS CLI(或)甚至通过 Python 使用 BOTO3 添加它...但我期待使用状态机的解决方案语言和我找到了一个...
我们所要做的就是在使用状态机语言创建我们的集群时,我们必须在其中指定 StepConcurrencyLevel,如 2(或)3,默认值为 1。一旦设置好,然后在下面创建 4 个步骤该集群和 运行 状态机。
其中集群将识别已设置的并发数,并将 运行 相应的步骤。
我的示例过程:
-> JSON 我编排的脚本
{
"StartAt": "Create_A_Cluster",
"States": {
"Create_A_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "WorkflowCluster",
"StepConcurrencyLevel": 2,
"Tags": [
{
"Key": "Description",
"Value": "process"
},
{
"Key": "Name",
"Value": "filename"
},
{
"Key": "Owner",
"Value": "owner"
},
{
"Key": "Project",
"Value": "roject"
},
{
"Key": "User",
"Value": "user"
}
],
"VisibleToAllUsers": true,
"ReleaseLabel": "emr-5.28.1",
"Applications": [
{
"Name": "Spark"
}
],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3://prefix/prefix/log.txt/",
"Instances": {
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 90
}
]
},
{
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 90
}
]
}
]
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fail_Cluster"
}
],
"ResultPath": "$.cluster",
"OutputPath": "$.cluster",
"Next": "Add_Steps_Parallel"
},
"Fail_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.Cause"
},
"Next": "Terminate_Cluster"
},
"Add_Steps_Parallel": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Step_One",
"States": {
"Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"Step": {
"Name": "The first step",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"maximizeResourceAllocation=true",
"--conf",
"spark.shuffle.service.enabled=true",
"--py-files",
"s3://prefix/prefix/pythonfile.py",
"s3://prefix/prefix/pythonfile.py"
]
}
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.err_mgs",
"Next": "Fail_SNS"
}
],
"ResultPath": "$.step1",
"Next": "Terminate_Cluster_1"
},
"Fail_SNS": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.err_mgs.Cause"
},
"ResultPath": "$.fail_cluster",
"Next": "Terminate_Cluster_1"
},
"Terminate_Cluster_1": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
},
{
"StartAt": "Step_Two",
"States": {
"Step_Two": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"Step": {
"Name": "The second step",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"maximizeResourceAllocation=true",
"--conf",
"spark.shuffle.service.enabled=true",
"--py-files",
"s3://prefix/prefix/pythonfile.py",
"s3://prefix/prefix/pythonfile.py"
]
}
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.err_mgs_1",
"Next": "Fail_SNS_1"
}
],
"ResultPath": "$.step2",
"Next": "Terminate_Cluster_2"
},
"Fail_SNS_1": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.err_mgs_1.Cause"
},
"ResultPath": "$.fail_cluster_1",
"Next": "Terminate_Cluster_2"
},
"Terminate_Cluster_2": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
}
],
"ResultPath": "$.steps",
"Next": "Terminate_Cluster"
},
"Terminate_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
}
在此脚本(或)AWS Step Function 的状态机语言中,在创建集群时我提到了 StepConcurrencyLevel 为 2 并添加了 2 个 spark 作业作为集群下方的步骤。
当我在 Step Function 中 运行 这个脚本时,我能够编排集群并在集群中同时执行 运行 2 个步骤,而无需在 AWS EMR 控制台中直接配置它(或)通过 AWS CLI(或)甚至通过 BOTO3。
我只是使用状态机语言在 AWS Step Function 下的单个集群中同时执行 运行ning 2 个步骤的编排,而没有任何其他服务(如 lambda 或 livy)的帮助 API 或BOTO3等...
流程图如下: AWS Step Function Workflow for concurrent step execution
为了更准确地说明我在上面的状态机语言中插入 StepConcurrencyLevel 的位置:
"Create_A_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "WorkflowCluster",
"StepConcurrencyLevel": 2,
"Tags": [
{
"Key": "Description",
"Value": "process"
},
下Create_A_Cluster.
谢谢。