boto EMR 添加步骤并自动终止
boto EMR add step and auto terminate
Python 2.7.12
boto3==1.3.1
如何向 运行 EMR 集群添加一个步骤并且在步骤完成后终止集群,无论它是失败还是成功?
创建集群
response = client.run_job_flow(
Name=name,
LogUri='s3://mybucket/emr/',
ReleaseLabel='emr-5.9.0',
Instances={
'MasterInstanceType': instance_type,
'SlaveInstanceType': instance_type,
'InstanceCount': instance_count,
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2KeyName': 'KeyPair',
'EmrManagedSlaveSecurityGroup': 'sg-1234',
'EmrManagedMasterSecurityGroup': 'sg-1234',
'Ec2SubnetId': 'subnet-1q234',
},
Applications=[
{'Name': 'Spark'},
{'Name': 'Hadoop'}
],
BootstrapActions=[
{
'Name': 'Install Python packages',
'ScriptBootstrapAction': {
'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
Configurations=[
{
'Classification': 'spark',
'Properties': {
'maximizeResourceAllocation': 'true'
}
},
],
)
添加一个步骤
response = client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=[
{
'Name': 'Run Step',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--py-files',
's3://mybucket/code/spark/spark_udfs.py',
's3://mybucket/code/spark/{}'.format(spark_script),
'--some-arg'
],
'Jar': 'command-runner.jar'
}
}
]
)
这成功添加了一个步骤并运行,但是,当该步骤成功完成时,我希望集群自动终止,如 AWS CLI 中所述:http://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html
在你的情况下(使用 boto3 创建集群)你可以添加这些标志
'TerminationProtected': False, 'AutoTerminate': True,
到您的集群创建。这样,在您完成 运行 的步骤后,集群将关闭。
另一个解决方案是在您想要 运行 的步骤之后立即添加另一个步骤来终止集群。所以基本上你需要运行这个命令作为步骤
aws emr terminate-clusters --cluster-ids your_cluster_id
棘手的部分是检索 cluster_id。
您可以在这里找到一些解决方案:Does an EMR master node know it's cluster id?
建议的'AutoTerminate': True
参数对我不起作用。但是,当我将参数 'KeepJobFlowAliveWhenNoSteps'
从 True
设置为 False
时,它起作用了。您的代码应如下所示:
response = client.run_job_flow(
Name=name,
LogUri='s3://mybucket/emr/',
ReleaseLabel='emr-5.9.0',
Instances={
'MasterInstanceType': instance_type,
'SlaveInstanceType': instance_type,
'InstanceCount': instance_count,
'KeepJobFlowAliveWhenNoSteps': False,
'Ec2KeyName': 'KeyPair',
'EmrManagedSlaveSecurityGroup': 'sg-1234',
'EmrManagedMasterSecurityGroup': 'sg-1234',
'Ec2SubnetId': 'subnet-1q234',
},
Applications=[
{'Name': 'Spark'},
{'Name': 'Hadoop'}
],
BootstrapActions=[
{
'Name': 'Install Python packages',
'ScriptBootstrapAction': {
'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
Configurations=[
{
'Classification': 'spark',
'Properties': {
'maximizeResourceAllocation': 'true'
}
},
],
)
您可以创建一个 short-lived 集群,该集群在所有步骤 运行 完成后自动终止,方法是在 Instances 参数中指定 'KeepJobFlowAliveWhenNoSteps': False。我在 GitHub 中添加了一个完整的示例,展示了如何执行此操作。
下面是演示中的一些代码:
def run_job_flow(
name, log_uri, keep_alive, applications, job_flow_role, service_role,
security_groups, steps, emr_client):
try:
response = emr_client.run_job_flow(
Name=name,
LogUri=log_uri,
ReleaseLabel='emr-5.30.1',
Instances={
'MasterInstanceType': 'm5.xlarge',
'SlaveInstanceType': 'm5.xlarge',
'InstanceCount': 3,
'KeepJobFlowAliveWhenNoSteps': keep_alive,
'EmrManagedMasterSecurityGroup': security_groups['manager'].id,
'EmrManagedSlaveSecurityGroup': security_groups['worker'].id,
},
Steps=[{
'Name': step['name'],
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit', '--deploy-mode', 'cluster',
step['script_uri'], *step['script_args']]
}
} for step in steps],
Applications=[{
'Name': app
} for app in applications],
JobFlowRole=job_flow_role.name,
ServiceRole=service_role.name,
EbsRootVolumeSize=10,
VisibleToAllUsers=True
)
cluster_id = response['JobFlowId']
logger.info("Created cluster %s.", cluster_id)
except ClientError:
logger.exception("Couldn't create cluster.")
raise
else:
return cluster_id
下面是一些使用实际参数调用此函数的代码:
output_prefix = 'pi-calc-output'
pi_step = {
'name': 'estimate-pi-step',
'script_uri': f's3://{bucket_name}/{script_key}',
'script_args':
['--partitions', '3', '--output_uri',
f's3://{bucket_name}/{output_prefix}']
}
cluster_id = emr_basics.run_job_flow(
f'{prefix}-cluster', f's3://{bucket_name}/logs',
False, ['Hadoop', 'Hive', 'Spark'], job_flow_role, service_role,
security_groups, [pi_step], emr_client)
Python 2.7.12
boto3==1.3.1
如何向 运行 EMR 集群添加一个步骤并且在步骤完成后终止集群,无论它是失败还是成功?
创建集群
response = client.run_job_flow(
Name=name,
LogUri='s3://mybucket/emr/',
ReleaseLabel='emr-5.9.0',
Instances={
'MasterInstanceType': instance_type,
'SlaveInstanceType': instance_type,
'InstanceCount': instance_count,
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2KeyName': 'KeyPair',
'EmrManagedSlaveSecurityGroup': 'sg-1234',
'EmrManagedMasterSecurityGroup': 'sg-1234',
'Ec2SubnetId': 'subnet-1q234',
},
Applications=[
{'Name': 'Spark'},
{'Name': 'Hadoop'}
],
BootstrapActions=[
{
'Name': 'Install Python packages',
'ScriptBootstrapAction': {
'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
Configurations=[
{
'Classification': 'spark',
'Properties': {
'maximizeResourceAllocation': 'true'
}
},
],
)
添加一个步骤
response = client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=[
{
'Name': 'Run Step',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--py-files',
's3://mybucket/code/spark/spark_udfs.py',
's3://mybucket/code/spark/{}'.format(spark_script),
'--some-arg'
],
'Jar': 'command-runner.jar'
}
}
]
)
这成功添加了一个步骤并运行,但是,当该步骤成功完成时,我希望集群自动终止,如 AWS CLI 中所述:http://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html
在你的情况下(使用 boto3 创建集群)你可以添加这些标志
'TerminationProtected': False, 'AutoTerminate': True,
到您的集群创建。这样,在您完成 运行 的步骤后,集群将关闭。
另一个解决方案是在您想要 运行 的步骤之后立即添加另一个步骤来终止集群。所以基本上你需要运行这个命令作为步骤
aws emr terminate-clusters --cluster-ids your_cluster_id
棘手的部分是检索 cluster_id。 您可以在这里找到一些解决方案:Does an EMR master node know it's cluster id?
建议的'AutoTerminate': True
参数对我不起作用。但是,当我将参数 'KeepJobFlowAliveWhenNoSteps'
从 True
设置为 False
时,它起作用了。您的代码应如下所示:
response = client.run_job_flow(
Name=name,
LogUri='s3://mybucket/emr/',
ReleaseLabel='emr-5.9.0',
Instances={
'MasterInstanceType': instance_type,
'SlaveInstanceType': instance_type,
'InstanceCount': instance_count,
'KeepJobFlowAliveWhenNoSteps': False,
'Ec2KeyName': 'KeyPair',
'EmrManagedSlaveSecurityGroup': 'sg-1234',
'EmrManagedMasterSecurityGroup': 'sg-1234',
'Ec2SubnetId': 'subnet-1q234',
},
Applications=[
{'Name': 'Spark'},
{'Name': 'Hadoop'}
],
BootstrapActions=[
{
'Name': 'Install Python packages',
'ScriptBootstrapAction': {
'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
Configurations=[
{
'Classification': 'spark',
'Properties': {
'maximizeResourceAllocation': 'true'
}
},
],
)
您可以创建一个 short-lived 集群,该集群在所有步骤 运行 完成后自动终止,方法是在 Instances 参数中指定 'KeepJobFlowAliveWhenNoSteps': False。我在 GitHub 中添加了一个完整的示例,展示了如何执行此操作。
下面是演示中的一些代码:
def run_job_flow(
name, log_uri, keep_alive, applications, job_flow_role, service_role,
security_groups, steps, emr_client):
try:
response = emr_client.run_job_flow(
Name=name,
LogUri=log_uri,
ReleaseLabel='emr-5.30.1',
Instances={
'MasterInstanceType': 'm5.xlarge',
'SlaveInstanceType': 'm5.xlarge',
'InstanceCount': 3,
'KeepJobFlowAliveWhenNoSteps': keep_alive,
'EmrManagedMasterSecurityGroup': security_groups['manager'].id,
'EmrManagedSlaveSecurityGroup': security_groups['worker'].id,
},
Steps=[{
'Name': step['name'],
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit', '--deploy-mode', 'cluster',
step['script_uri'], *step['script_args']]
}
} for step in steps],
Applications=[{
'Name': app
} for app in applications],
JobFlowRole=job_flow_role.name,
ServiceRole=service_role.name,
EbsRootVolumeSize=10,
VisibleToAllUsers=True
)
cluster_id = response['JobFlowId']
logger.info("Created cluster %s.", cluster_id)
except ClientError:
logger.exception("Couldn't create cluster.")
raise
else:
return cluster_id
下面是一些使用实际参数调用此函数的代码:
output_prefix = 'pi-calc-output'
pi_step = {
'name': 'estimate-pi-step',
'script_uri': f's3://{bucket_name}/{script_key}',
'script_args':
['--partitions', '3', '--output_uri',
f's3://{bucket_name}/{output_prefix}']
}
cluster_id = emr_basics.run_job_flow(
f'{prefix}-cluster', f's3://{bucket_name}/logs',
False, ['Hadoop', 'Hive', 'Spark'], job_flow_role, service_role,
security_groups, [pi_step], emr_client)