Airflow - 动态任务和下游依赖

Airflow - Dynamic Tasks and Downstream Dependencies

我在 AWS MWAA v2.2.2

上继承了 运行ning 的以下 dag
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
}

snowflake_conn_id='snowflake'

sources = [
    {'name': 'test_1', 'path': 'test1/path/'},
    {'name': 'test_2', 'path': 'test2/path'}
 ]

# define the DAG
with DAG(
    'test_dag',
    default_args=default_args,
    description='test_dag test description.',
    schedule_interval=None,
    max_active_runs=1
) as dag:

    t0 = DummyOperator(task_id = 'start')

    for source in sources:
        create_table_sql = (
            f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
            )

        external_tables_from_s3 = SnowflakeOperator(
            task_id=f"create_external_table_for_{source['name']}",
            dag=dag,
            sql=create_table_sql,
            snowflake_conn_id=snowflake_conn_id
        )

    t1 = DummyOperator(task_id = 'end')

    t0 >> external_tables_from_s3 >> t1

设置此 dag 以便 external_tables_from_s3 任务可以 运行 并行

的最佳方法是什么

基本上我想要的是

to >> [create_external_table_for_test_1, create_external_table_for_test_2] >> t1

我想知道在不必单独指定每个任务的情况下实现此目标的最佳方法是什么。源列表比这个大很多,只是针对这个问题进行了缩减

根据您希望如何在 UI 中可视化 DAG,有几个选项:

  1. 使用列表包含所有 SnowflakeOperator 任务,或
  2. 使用 TaskGroup.

选项 1:

from pendulum import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator


snowflake_conn_id = "snowflake"

sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]

# define the DAG
with DAG(
    "test_dag",
    default_args={
        "owner": "test",
        "depends_on_past": False,
        "email_on_failure": False,
        "email_on_retry": False,
    },
    description="test_dag test description.",
    start_date=datetime(2022, 2, 1),
    schedule_interval=None,
    max_active_runs=1,
) as dag:

    t0 = DummyOperator(task_id="start")

    snowflake_tasks = []
    for source in sources:
        create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"

        external_tables_from_s3 = SnowflakeOperator(
            task_id=f"create_external_table_for_{source['name']}",
            sql=create_table_sql,
            snowflake_conn_id=snowflake_conn_id,
        )
        snowflake_tasks.append(external_tables_from_s3)

    t1 = DummyOperator(task_id="end")

    t0 >> snowflake_tasks >> t1

选项 2:

from pendulum import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.task_group import TaskGroup


snowflake_conn_id = "snowflake"

sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]

# define the DAG
with DAG(
    "test_dag",
    default_args={
        "owner": "test",
        "depends_on_past": False,
        "email_on_failure": False,
        "email_on_retry": False,
    },
    description="test_dag test description.",
    start_date=datetime(2022, 2, 1),
    schedule_interval=None,
    max_active_runs=1,
) as dag:

    t0 = DummyOperator(task_id="start")

    with TaskGroup(group_id="snowflake_tasks") as snowflake_tasks:
        for source in sources:
            create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"

            external_tables_from_s3 = SnowflakeOperator(
                task_id=f"create_external_table_for_{source['name']}",
                sql=create_table_sql,
                snowflake_conn_id=snowflake_conn_id,
            )

    t1 = DummyOperator(task_id="end")

    t0 >> snowflake_tasks >> t1