如何在循环中动态生成气流任务并运行并行?

How to dynamically generate airflow tasks in a loop and run them parallelly?

我有一个用例,我正在下载一些 json 文件并解析它们。根据下载的文件,程序需要将数据填充到不同的表中。在表中加载数据后,必须发送电子邮件通知。

例如,如果程序需要填充表 a 和 b(从 table_list 获得),那么工作流程应该类似于下载 >> 解析 >> [load_table_a, load_table_b] >> send_email

如果表 a、b、c、d 是从 table_list 获得的,那么工作流程应该类似于下载 >> 解析 >> [load_table_a、load_table_b、load_table_c, load_table_d] >> send_email

这就是我正在尝试的。有人可以帮忙吗

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

from datetime import datetime

from download_script import download
from parse_script import parse
from load_2_sf_script import get_table_list, insert_into_sf
from airflow.utils.email import send_email_smtp


default_args = {
    'start_date': datetime(2021, 5, 18)
}


with DAG(
    'Test DAG',
    default_args = default_args,
    catchup = False
) as dag:

    download = PythonOperator(
        task_id = 'download',
        python_callable = download,
        email_on_failure = True,
        email = 'example@example.com'
    )

    parse = PythonOperator(
        task_id = 'parse',
        python_callable = parse,
        email_on_failure = True,
        email = 'example@example.com'
    )


    table_list = get_table_list()
    task_list = []
    for table in table_list:
        task_list.append(
            PythonOperator(
                task_id = 'load_table_{}'.format(table),
                python_callable = insert_into_sf,
                email_on_failure = True,
                email = 'example@example.com',
                op_kwargs = {'category': table}
            )
        )


    send_email = EmailOperator(
        task_id = 'send_email',
        to = ['example@example.com'],
        subject = 'Airflow: Success',
        html_content = 'Dag run completed succesfully.'
    )

    download >> parse >> [task for task in task_list] >> send_email

    

如果这是您所期望的:

那么这将起作用:

with DAG(
    'medical_device',
    default_args=default_args,
    catchup=False
) as dag:
    download_task = PythonOperator(
        task_id='download_task',
        python_callable=download,
        email_on_failure=True,
        email='example@example.com'
    )

    parse_task = PythonOperator(
        task_id='parse_task',
        python_callable=parse,
        email_on_failure=True,
        email='example@example.com'
    )


    send_email = EmailOperator(
        task_id='send_email',
        to=['example@example.com'],
        subject='Airflow: Success',
        html_content='Dag run completed succesfully.'
    )

    download_task >> parse_task

    table_list = get_table_list()
    for table in table_list:
        op = PythonOperator(
                task_id='load_table_{}'.format(table),
                python_callable=insert_into_sf,
                email_on_failure=True,
                email='example@example.com',
                op_kwargs={'category': table}
            )
        parse_task >> op >> send_email

不需要构造列表,可以在for循环中使用parse_task >> op >> send_email动态设置上下游关系。

提示:尽量让您的 task_id 与任务的变量名称保持一致,这不是必需的,但却是一个很好的做法。