当通过 Airflow 启动时,为什么我的 EMR 在最后一步后终止?
why does my EMR terminate after last step, when started through Airflow?
我正在使用下面的代码 运行 EM,复制并执行 shell 脚本。
cluster_creator == 通过EMR启动集群
step_adder == 正在执行 Shell 个脚本
step_checker == 检查步骤是否完成。
我的问题是,为什么 EMR 在最后一步后终止(即使我指定了 CANCEL_AND_WAIT)?
请让我知道,因为我是气流的新手。跟 XComm 有关系吗?
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.base_hook import BaseHook
import boto3
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
import pendulum
from airflow.utils.dates import days_ago
import json, os
import pytz
os.environ["AWS_ACCESS_KEY_ID"] = "A"
os.environ["AWS_SECRET_ACCESS_KEY"] = "2"
account_id = boto3.client('sts').get_caller_identity().get('Account')
if account_id=="8":
path="prd-datahub"
elif account_id=="5":
path="datahub"
tz = pytz.timezone('US/Central')
DAG_ID = os.path.basename(__file__).replace(".py", "")
os.environ['AWS_DEFAULT_REGION'] = 'us-west-1'
def localize_utc_tz(d):
return tz.fromutc(d)
def task_fail_slack_alert(context):
SLACK_CONN_ID = 'slack'
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Pitchbook Job Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
*Error*:{exception}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
exception=context.get('exception')
)
failed_alert = SlackWebhookOperator(
task_id='slack',
http_conn_id='slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow',
dag=dag)
return failed_alert.execute(context=context)
DEFAULT_ARGS = {
'owner': 'Turo Kee',
'depends_on_past': False,
'email': ['ging1.com'],
'email_on_failure': True,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'Copy Test Scripts',
"ActionOnFailure": "CANCEL_AND_WAIT",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": ["aws","s3","cp","s3://"+path+"-pyspark/pitchbook/run_test.sh","/home/hadoop/"],
}
},
{
'Name': 'Execute Test Scripts',
"ActionOnFailure": "CANCEL_AND_WAIT",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": ["sh","/home/hadoop/run_test.sh"],
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'Pitchbook ETL',
'ReleaseLabel': 'emr-6.2.0',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'BootstrapActions': [
{
'Name': 'Install Dependencies',
'ScriptBootstrapAction': {
'Path': 's3://'+path+'-pyspark/pitchbook/install_python_modules.sh',
}
}
],
'Configurations': [{"Classification":"core-site", "Properties":{"io.compression.codec.lzo.class":"", "io.compression.codecs":"org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"}, "Configurations":[]}],
'VisibleToAllUsers': True,
'EbsRootVolumeSize' :20,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=pendulum.datetime(2022, 4, 19),
schedule_interval='05 7 * * *',
user_defined_filters={
'localtz': localize_utc_tz,
},
tags=['emr-dev'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES)
step_adder = EmrAddStepsOperator(task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
on_failure_callback=task_fail_slack_alert)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
on_failure_callback=task_fail_slack_alert,)
cluster_creator >> step_adder >> step_checker
感谢您的所有帮助
发生这种情况是因为您在 JOB_FLOW_OVERRIDES
中设置了 'KeepJobFlowAliveWhenNoSteps': False
。
这在 EMR docs:
中有解释
If the JobFlowInstancesConfig KeepJobFlowAliveWhenNoSteps parameter
is set to TRUE, the cluster transitions to the WAITING state rather
than shutting down after the steps have completed.
更改为 'KeepJobFlowAliveWhenNoSteps': True
集群将保持空闲状态直到您终止它。
我正在使用下面的代码 运行 EM,复制并执行 shell 脚本。
cluster_creator == 通过EMR启动集群
step_adder == 正在执行 Shell 个脚本
step_checker == 检查步骤是否完成。
我的问题是,为什么 EMR 在最后一步后终止(即使我指定了 CANCEL_AND_WAIT)?
请让我知道,因为我是气流的新手。跟 XComm 有关系吗?
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.base_hook import BaseHook
import boto3
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
import pendulum
from airflow.utils.dates import days_ago
import json, os
import pytz
os.environ["AWS_ACCESS_KEY_ID"] = "A"
os.environ["AWS_SECRET_ACCESS_KEY"] = "2"
account_id = boto3.client('sts').get_caller_identity().get('Account')
if account_id=="8":
path="prd-datahub"
elif account_id=="5":
path="datahub"
tz = pytz.timezone('US/Central')
DAG_ID = os.path.basename(__file__).replace(".py", "")
os.environ['AWS_DEFAULT_REGION'] = 'us-west-1'
def localize_utc_tz(d):
return tz.fromutc(d)
def task_fail_slack_alert(context):
SLACK_CONN_ID = 'slack'
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Pitchbook Job Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
*Error*:{exception}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
exception=context.get('exception')
)
failed_alert = SlackWebhookOperator(
task_id='slack',
http_conn_id='slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow',
dag=dag)
return failed_alert.execute(context=context)
DEFAULT_ARGS = {
'owner': 'Turo Kee',
'depends_on_past': False,
'email': ['ging1.com'],
'email_on_failure': True,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'Copy Test Scripts',
"ActionOnFailure": "CANCEL_AND_WAIT",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": ["aws","s3","cp","s3://"+path+"-pyspark/pitchbook/run_test.sh","/home/hadoop/"],
}
},
{
'Name': 'Execute Test Scripts',
"ActionOnFailure": "CANCEL_AND_WAIT",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": ["sh","/home/hadoop/run_test.sh"],
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'Pitchbook ETL',
'ReleaseLabel': 'emr-6.2.0',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'BootstrapActions': [
{
'Name': 'Install Dependencies',
'ScriptBootstrapAction': {
'Path': 's3://'+path+'-pyspark/pitchbook/install_python_modules.sh',
}
}
],
'Configurations': [{"Classification":"core-site", "Properties":{"io.compression.codec.lzo.class":"", "io.compression.codecs":"org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"}, "Configurations":[]}],
'VisibleToAllUsers': True,
'EbsRootVolumeSize' :20,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=pendulum.datetime(2022, 4, 19),
schedule_interval='05 7 * * *',
user_defined_filters={
'localtz': localize_utc_tz,
},
tags=['emr-dev'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES)
step_adder = EmrAddStepsOperator(task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
on_failure_callback=task_fail_slack_alert)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
on_failure_callback=task_fail_slack_alert,)
cluster_creator >> step_adder >> step_checker
感谢您的所有帮助
发生这种情况是因为您在 JOB_FLOW_OVERRIDES
中设置了 'KeepJobFlowAliveWhenNoSteps': False
。
这在 EMR docs:
If the JobFlowInstancesConfig KeepJobFlowAliveWhenNoSteps parameter is set to TRUE, the cluster transitions to the WAITING state rather than shutting down after the steps have completed.
更改为 'KeepJobFlowAliveWhenNoSteps': True
集群将保持空闲状态直到您终止它。