当通过 Airflow 启动时,为什么我的 EMR 在最后一步后终止?

why does my EMR terminate after last step, when started through Airflow?

我正在使用下面的代码 运行 EM,复制并执行 shell 脚本。

cluster_creator == 通过EMR启动集群

step_adder == 正在执行 Shell 个脚本

step_checker == 检查步骤是否完成。

我的问题是,为什么 EMR 在最后一步后终止(即使我指定了 CANCEL_AND_WAIT)?

请让我知道,因为我是气流的新手。跟 XComm 有关系吗?

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable

from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.base_hook import BaseHook
import boto3
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
import pendulum
from airflow.utils.dates import days_ago
import json, os
import pytz
os.environ["AWS_ACCESS_KEY_ID"] = "A"
os.environ["AWS_SECRET_ACCESS_KEY"] = "2"

account_id = boto3.client('sts').get_caller_identity().get('Account')
if account_id=="8":
    path="prd-datahub"
elif account_id=="5":   
    path="datahub"
   
tz = pytz.timezone('US/Central')
DAG_ID = os.path.basename(__file__).replace(".py", "")
os.environ['AWS_DEFAULT_REGION'] = 'us-west-1'
def localize_utc_tz(d):
    return tz.fromutc(d)
def task_fail_slack_alert(context):
    SLACK_CONN_ID = 'slack'
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = """
            :red_circle: Pitchbook Job Failed. 
            *Task*: {task}  
            *Dag*: {dag} 
            *Execution Time*: {exec_date}  
            *Log Url*: {log_url} 
            *Error*:{exception}
            """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            exec_date=context.get('execution_date'),
            log_url=context.get('task_instance').log_url,
            exception=context.get('exception') 
           
        )
    failed_alert = SlackWebhookOperator(
        task_id='slack',
        http_conn_id='slack',
        webhook_token=slack_webhook_token,
        message=slack_msg,
        username='airflow',
        dag=dag)
    return failed_alert.execute(context=context)


DEFAULT_ARGS = {
    'owner': 'Turo Kee',
    'depends_on_past': False,
    'email': ['ging1.com'],
    'email_on_failure': True,
    'email_on_retry': False,
              }
              
SPARK_STEPS = [
    
    {
        'Name': 'Copy Test Scripts',
        "ActionOnFailure": "CANCEL_AND_WAIT",
        'HadoopJarStep': {
            "Jar": "command-runner.jar",
            "Args": ["aws","s3","cp","s3://"+path+"-pyspark/pitchbook/run_test.sh","/home/hadoop/"],
        }
    },


    {
        'Name': 'Execute Test Scripts',
        "ActionOnFailure": "CANCEL_AND_WAIT",
        'HadoopJarStep': {
            "Jar": "command-runner.jar",
            "Args": ["sh","/home/hadoop/run_test.sh"],
                        }
    }
            ]


JOB_FLOW_OVERRIDES = {
    'Name': 'Pitchbook ETL',
    'ReleaseLabel': 'emr-6.2.0',
    'Applications': [
        {
            'Name': 'Spark'
        },
    ],
    'Instances': {
        'InstanceGroups': [
            {
                'Name': 'Master nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
        
    },
   
    'BootstrapActions': [
        {
            'Name': 'Install Dependencies',
            'ScriptBootstrapAction': {
                'Path': 's3://'+path+'-pyspark/pitchbook/install_python_modules.sh',
            }
        }

    ],
    'Configurations': [{"Classification":"core-site", "Properties":{"io.compression.codec.lzo.class":"", "io.compression.codecs":"org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"}, "Configurations":[]}],

    'VisibleToAllUsers': True,
    'EbsRootVolumeSize' :20,      
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',        
} 
with DAG(
        dag_id=DAG_ID,
        description='Run built-in Spark app on Amazon EMR',
        default_args=DEFAULT_ARGS,
        dagrun_timeout=timedelta(hours=2),
        start_date=pendulum.datetime(2022, 4, 19),
        schedule_interval='05 7 * * *',
        user_defined_filters={
            'localtz': localize_utc_tz,
        },
        tags=['emr-dev'],
) as dag:
    
    
    cluster_creator = EmrCreateJobFlowOperator(task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES)

    step_adder = EmrAddStepsOperator(task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_STEPS,
    on_failure_callback=task_fail_slack_alert)

    step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    on_failure_callback=task_fail_slack_alert,)


    cluster_creator >> step_adder >> step_checker

感谢您的所有帮助

发生这种情况是因为您在 JOB_FLOW_OVERRIDES 中设置了 'KeepJobFlowAliveWhenNoSteps': False。 这在 EMR docs:

中有解释

If the JobFlowInstancesConfig KeepJobFlowAliveWhenNoSteps parameter is set to TRUE, the cluster transitions to the WAITING state rather than shutting down after the steps have completed.

更改为 'KeepJobFlowAliveWhenNoSteps': True 集群将保持空闲状态直到您终止它。