Airflow DAG 不会跳过 BranchPythonOperator 或 ShortCircuitOperator 之后的任务
Airflow DAG does not skip tasks after BranchPythonOperator or ShortCircuitOperator
我正在编写一个带有 BranchPythonOperator 的 DAG 来检查数据是否可供下载。如果数据在那里,DAG 应该下载并将其合并到我的 PostgreSQL 数据库中。如果不存在,则应跳过所有处理任务,分支应转到 DummyOperator。不幸的是,DAG 并没有跳过所有任务。它将跳过最多 6 个任务,但随后停止(下游任务的状态未知)并且 DAG 失败。我没有在日志中找到任何错误消息(因为任务没有失败)。
气流版本 1.8.1。我在下面附上了一些截图。在下面的 DAG 示例中,我将敏感文件信息替换为 'XXXXX'。我也试过 ShortCircuitOperator,但只能跳过 SCO 下游的任务。
谢谢!
from airflow import DAG
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
default_args = {'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG('Question_DAG',
template_searchpath="/home/airflow/code",
schedule_interval='0 10 * * *',
catchup=True,
default_args=default_args
)
data_check = SSHExecuteOperator(task_id='check_for_new_data_and_download',
ssh_hook=SSHHook(conn_id='server'),
bash_command='download_file.sh',
xcom_push=True,
dag=dag)
def identify_new_data(**kwargs):
''' If DATA has not been uploaded, the DAG does not continue.'''
new_data_code = kwargs['task_instance'].xcom_pull(task_ids='check_for_new_data_and_download', key=None)
filecount_type_conversion_success = True
try:
new_data_code = int(new_data_code)
except ValueError:
filecount_type_conversion_success = False
# print new_data_code, type(new_data_code)
# 1 means there is new data, therefore it should update the data tables.
# 2 means source data was not uploaded
if new_data_code == 1:
return 'data_uploaded'
elif new_data_code == 2 or new_data_code == 3:
return 'no_data_uploaded'
identify_new_data = BranchPythonOperator(task_id='identify_new_data',
python_callable=identify_new_data,
trigger_rule="all_done",
provide_context=True,
dag=dag)
no_data_uploaded = DummyOperator(task_id="no_data_uploaded",
trigger_rule='all_done',
dag=dag)
data_uploaded = EmailOperator(task_id='data_uploaded',
to='myemail@google',
subject='File Downloaded',
html_content='Hello, This is an auto-generated email to inform you that the montly data has as been downloaded. Thank you.',
dag=dag)
################# create_raw_table ################################
create_raw_table = PostgresOperator(task_id='create_raw_table',
postgres_conn_id='warehouse',
sql='create_raw_table.sql',
dag=dag)
################# Convert fixed width file to csv ################################
convert_fixed_width_csv = SSHExecuteOperator(task_id='convert_fixed_width_csv',
ssh_hook=SSHHook(conn_id='server'),
bash_command='convert_fixed_width_csv.sh',
dag=dag)
################# Dedupe ##############
dedupe_on_id = PostgresOperator(task_id='dedupe_on_id',
postgres_conn_id='warehouse',
sql='dedupe.sql',
dag=dag)
################# Date Insert ################################
date_insert = PostgresOperator(task_id='add_dates_raw',
postgres_conn_id='warehouse',
sql='add_dates.sql',
dag=dag)
################ Client Insert ###########################
client_insert = PostgresOperator(task_id='client_insert',
postgres_conn_id='warehouse',
sql='client_insert.sql',
dag=dag)
################# Months Insert ###########################
months_insert = PostgresOperator(task_id='months_insert',
postgres_conn_id='warehouse',
sql='months_insert.sql',
dag=dag)
################# Eligibility Insert ######################
eligibility_insert = PostgresOperator(task_id='eligibility_insert',
postgres_conn_id='warehouse',
sql='eligibility_insert.sql',
dag=dag)
################# Plan Insert ####################
plan_insert = PostgresOperator(task_id='plan_insert',
postgres_conn_id='warehouse',
sql='plan_insert.sql',
dag=dag)
################# Codes ###################################
codes = PostgresOperator(task_id='codes',
postgres_conn_id='warehouse',
sql='codes.sql',
dag=dag)
################# Update Dates ################################
update_dates = PostgresOperator(task_id='update_dates',
postgres_conn_id='warehouse',
sql='update_dates.sql',
dag=dag)
################# Clients ################################
create_clients = PostgresOperator(task_id='create_clients',
postgres_conn_id='warehouse',
sql='clients.sql',
dag=dag)
################# fix_addresses ############
fix_addresses = SSHExecuteOperator(task_id='fix_addresses',
ssh_hook=SSHHook(conn_id='server'),
bash_command='fix_addresses.sh',
dag=dag)
################# Load data ############
load_data_command = """
cd data/
TASKDATE='date +%Y%m'
cp XXXX.TXT /home/admin/data/XXX_loaded/XXX.TXT
"""
load_data = SSHExecuteOperator(task_id='load_data',
ssh_hook=SSHHook(conn_id='server'),
bash_command=load_data_command,
dag=dag)
################# Update system status ################################
system_status = PostgresOperator(task_id='update_system_status',
postgres_conn_id='warehouse',
sql="SELECT update_system_status('new_info')",
dag=dag)
data_check.set_downstream(identify_new_data)
identify_new_data.set_downstream(data_uploaded)
data_uploaded.set_downstream(create_raw_table)
create_raw_table.set_downstream(convert_fixed_width_csv)
convert_fixed_width_csv.set_downstream(dedupe_on_id)
dedupe_on_id.set_downstream(date_insert)
date_insert.set_downstream(client_insert)
client_insert.set_downstream(months_insert)
months_insert.set_downstream(eligibility_insert)
eligibility_insert.set_downstream(plan_insert)
plan_insert.set_downstream(codes)
codes.set_downstream(update_dates)
update_dates.set_downstream(create_clients)
create_clients.set_downstream(fix_addresses)
fix_addresses.set_downstream(load_data)
load_data.set_downstream(system_status)
随附的屏幕截图显示了 Airflow 上的树视图 UI,我正在尝试解决某些任务未使其失败的问题。
DAG tasks not skipping
DAG tasks
我相信您 运行 遇到了 AIRFLOW-1296 中描述的相同问题。在 Airflow 1.8.2 中对其进行了修复,因此我会升级并查看您是否仍然可以重现它。它对我有用,但正如评论中所见,有一些混合结果。
我正在编写一个带有 BranchPythonOperator 的 DAG 来检查数据是否可供下载。如果数据在那里,DAG 应该下载并将其合并到我的 PostgreSQL 数据库中。如果不存在,则应跳过所有处理任务,分支应转到 DummyOperator。不幸的是,DAG 并没有跳过所有任务。它将跳过最多 6 个任务,但随后停止(下游任务的状态未知)并且 DAG 失败。我没有在日志中找到任何错误消息(因为任务没有失败)。
气流版本 1.8.1。我在下面附上了一些截图。在下面的 DAG 示例中,我将敏感文件信息替换为 'XXXXX'。我也试过 ShortCircuitOperator,但只能跳过 SCO 下游的任务。
谢谢!
from airflow import DAG
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
default_args = {'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG('Question_DAG',
template_searchpath="/home/airflow/code",
schedule_interval='0 10 * * *',
catchup=True,
default_args=default_args
)
data_check = SSHExecuteOperator(task_id='check_for_new_data_and_download',
ssh_hook=SSHHook(conn_id='server'),
bash_command='download_file.sh',
xcom_push=True,
dag=dag)
def identify_new_data(**kwargs):
''' If DATA has not been uploaded, the DAG does not continue.'''
new_data_code = kwargs['task_instance'].xcom_pull(task_ids='check_for_new_data_and_download', key=None)
filecount_type_conversion_success = True
try:
new_data_code = int(new_data_code)
except ValueError:
filecount_type_conversion_success = False
# print new_data_code, type(new_data_code)
# 1 means there is new data, therefore it should update the data tables.
# 2 means source data was not uploaded
if new_data_code == 1:
return 'data_uploaded'
elif new_data_code == 2 or new_data_code == 3:
return 'no_data_uploaded'
identify_new_data = BranchPythonOperator(task_id='identify_new_data',
python_callable=identify_new_data,
trigger_rule="all_done",
provide_context=True,
dag=dag)
no_data_uploaded = DummyOperator(task_id="no_data_uploaded",
trigger_rule='all_done',
dag=dag)
data_uploaded = EmailOperator(task_id='data_uploaded',
to='myemail@google',
subject='File Downloaded',
html_content='Hello, This is an auto-generated email to inform you that the montly data has as been downloaded. Thank you.',
dag=dag)
################# create_raw_table ################################
create_raw_table = PostgresOperator(task_id='create_raw_table',
postgres_conn_id='warehouse',
sql='create_raw_table.sql',
dag=dag)
################# Convert fixed width file to csv ################################
convert_fixed_width_csv = SSHExecuteOperator(task_id='convert_fixed_width_csv',
ssh_hook=SSHHook(conn_id='server'),
bash_command='convert_fixed_width_csv.sh',
dag=dag)
################# Dedupe ##############
dedupe_on_id = PostgresOperator(task_id='dedupe_on_id',
postgres_conn_id='warehouse',
sql='dedupe.sql',
dag=dag)
################# Date Insert ################################
date_insert = PostgresOperator(task_id='add_dates_raw',
postgres_conn_id='warehouse',
sql='add_dates.sql',
dag=dag)
################ Client Insert ###########################
client_insert = PostgresOperator(task_id='client_insert',
postgres_conn_id='warehouse',
sql='client_insert.sql',
dag=dag)
################# Months Insert ###########################
months_insert = PostgresOperator(task_id='months_insert',
postgres_conn_id='warehouse',
sql='months_insert.sql',
dag=dag)
################# Eligibility Insert ######################
eligibility_insert = PostgresOperator(task_id='eligibility_insert',
postgres_conn_id='warehouse',
sql='eligibility_insert.sql',
dag=dag)
################# Plan Insert ####################
plan_insert = PostgresOperator(task_id='plan_insert',
postgres_conn_id='warehouse',
sql='plan_insert.sql',
dag=dag)
################# Codes ###################################
codes = PostgresOperator(task_id='codes',
postgres_conn_id='warehouse',
sql='codes.sql',
dag=dag)
################# Update Dates ################################
update_dates = PostgresOperator(task_id='update_dates',
postgres_conn_id='warehouse',
sql='update_dates.sql',
dag=dag)
################# Clients ################################
create_clients = PostgresOperator(task_id='create_clients',
postgres_conn_id='warehouse',
sql='clients.sql',
dag=dag)
################# fix_addresses ############
fix_addresses = SSHExecuteOperator(task_id='fix_addresses',
ssh_hook=SSHHook(conn_id='server'),
bash_command='fix_addresses.sh',
dag=dag)
################# Load data ############
load_data_command = """
cd data/
TASKDATE='date +%Y%m'
cp XXXX.TXT /home/admin/data/XXX_loaded/XXX.TXT
"""
load_data = SSHExecuteOperator(task_id='load_data',
ssh_hook=SSHHook(conn_id='server'),
bash_command=load_data_command,
dag=dag)
################# Update system status ################################
system_status = PostgresOperator(task_id='update_system_status',
postgres_conn_id='warehouse',
sql="SELECT update_system_status('new_info')",
dag=dag)
data_check.set_downstream(identify_new_data)
identify_new_data.set_downstream(data_uploaded)
data_uploaded.set_downstream(create_raw_table)
create_raw_table.set_downstream(convert_fixed_width_csv)
convert_fixed_width_csv.set_downstream(dedupe_on_id)
dedupe_on_id.set_downstream(date_insert)
date_insert.set_downstream(client_insert)
client_insert.set_downstream(months_insert)
months_insert.set_downstream(eligibility_insert)
eligibility_insert.set_downstream(plan_insert)
plan_insert.set_downstream(codes)
codes.set_downstream(update_dates)
update_dates.set_downstream(create_clients)
create_clients.set_downstream(fix_addresses)
fix_addresses.set_downstream(load_data)
load_data.set_downstream(system_status)
随附的屏幕截图显示了 Airflow 上的树视图 UI,我正在尝试解决某些任务未使其失败的问题。
DAG tasks not skipping
DAG tasks
我相信您 运行 遇到了 AIRFLOW-1296 中描述的相同问题。在 Airflow 1.8.2 中对其进行了修复,因此我会升级并查看您是否仍然可以重现它。它对我有用,但正如评论中所见,有一些混合结果。