如何在循环中动态生成气流任务并运行并行?
How to dynamically generate airflow tasks in a loop and run them parallelly?
我有一个用例,我正在下载一些 json 文件并解析它们。根据下载的文件,程序需要将数据填充到不同的表中。在表中加载数据后,必须发送电子邮件通知。
例如,如果程序需要填充表 a 和 b(从 table_list 获得),那么工作流程应该类似于下载 >> 解析 >> [load_table_a, load_table_b] >> send_email
如果表 a、b、c、d 是从 table_list 获得的,那么工作流程应该类似于下载 >> 解析 >> [load_table_a、load_table_b、load_table_c, load_table_d] >> send_email
这就是我正在尝试的。有人可以帮忙吗
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime
from download_script import download
from parse_script import parse
from load_2_sf_script import get_table_list, insert_into_sf
from airflow.utils.email import send_email_smtp
default_args = {
'start_date': datetime(2021, 5, 18)
}
with DAG(
'Test DAG',
default_args = default_args,
catchup = False
) as dag:
download = PythonOperator(
task_id = 'download',
python_callable = download,
email_on_failure = True,
email = 'example@example.com'
)
parse = PythonOperator(
task_id = 'parse',
python_callable = parse,
email_on_failure = True,
email = 'example@example.com'
)
table_list = get_table_list()
task_list = []
for table in table_list:
task_list.append(
PythonOperator(
task_id = 'load_table_{}'.format(table),
python_callable = insert_into_sf,
email_on_failure = True,
email = 'example@example.com',
op_kwargs = {'category': table}
)
)
send_email = EmailOperator(
task_id = 'send_email',
to = ['example@example.com'],
subject = 'Airflow: Success',
html_content = 'Dag run completed succesfully.'
)
download >> parse >> [task for task in task_list] >> send_email
如果这是您所期望的:
那么这将起作用:
with DAG(
'medical_device',
default_args=default_args,
catchup=False
) as dag:
download_task = PythonOperator(
task_id='download_task',
python_callable=download,
email_on_failure=True,
email='example@example.com'
)
parse_task = PythonOperator(
task_id='parse_task',
python_callable=parse,
email_on_failure=True,
email='example@example.com'
)
send_email = EmailOperator(
task_id='send_email',
to=['example@example.com'],
subject='Airflow: Success',
html_content='Dag run completed succesfully.'
)
download_task >> parse_task
table_list = get_table_list()
for table in table_list:
op = PythonOperator(
task_id='load_table_{}'.format(table),
python_callable=insert_into_sf,
email_on_failure=True,
email='example@example.com',
op_kwargs={'category': table}
)
parse_task >> op >> send_email
不需要构造列表,可以在for循环中使用parse_task >> op >> send_email
动态设置上下游关系。
提示:尽量让您的 task_id 与任务的变量名称保持一致,这不是必需的,但却是一个很好的做法。
我有一个用例,我正在下载一些 json 文件并解析它们。根据下载的文件,程序需要将数据填充到不同的表中。在表中加载数据后,必须发送电子邮件通知。
例如,如果程序需要填充表 a 和 b(从 table_list 获得),那么工作流程应该类似于下载 >> 解析 >> [load_table_a, load_table_b] >> send_email
如果表 a、b、c、d 是从 table_list 获得的,那么工作流程应该类似于下载 >> 解析 >> [load_table_a、load_table_b、load_table_c, load_table_d] >> send_email
这就是我正在尝试的。有人可以帮忙吗
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime
from download_script import download
from parse_script import parse
from load_2_sf_script import get_table_list, insert_into_sf
from airflow.utils.email import send_email_smtp
default_args = {
'start_date': datetime(2021, 5, 18)
}
with DAG(
'Test DAG',
default_args = default_args,
catchup = False
) as dag:
download = PythonOperator(
task_id = 'download',
python_callable = download,
email_on_failure = True,
email = 'example@example.com'
)
parse = PythonOperator(
task_id = 'parse',
python_callable = parse,
email_on_failure = True,
email = 'example@example.com'
)
table_list = get_table_list()
task_list = []
for table in table_list:
task_list.append(
PythonOperator(
task_id = 'load_table_{}'.format(table),
python_callable = insert_into_sf,
email_on_failure = True,
email = 'example@example.com',
op_kwargs = {'category': table}
)
)
send_email = EmailOperator(
task_id = 'send_email',
to = ['example@example.com'],
subject = 'Airflow: Success',
html_content = 'Dag run completed succesfully.'
)
download >> parse >> [task for task in task_list] >> send_email
如果这是您所期望的:
那么这将起作用:
with DAG(
'medical_device',
default_args=default_args,
catchup=False
) as dag:
download_task = PythonOperator(
task_id='download_task',
python_callable=download,
email_on_failure=True,
email='example@example.com'
)
parse_task = PythonOperator(
task_id='parse_task',
python_callable=parse,
email_on_failure=True,
email='example@example.com'
)
send_email = EmailOperator(
task_id='send_email',
to=['example@example.com'],
subject='Airflow: Success',
html_content='Dag run completed succesfully.'
)
download_task >> parse_task
table_list = get_table_list()
for table in table_list:
op = PythonOperator(
task_id='load_table_{}'.format(table),
python_callable=insert_into_sf,
email_on_failure=True,
email='example@example.com',
op_kwargs={'category': table}
)
parse_task >> op >> send_email
不需要构造列表,可以在for循环中使用parse_task >> op >> send_email
动态设置上下游关系。
提示:尽量让您的 task_id 与任务的变量名称保持一致,这不是必需的,但却是一个很好的做法。