Airflow - 动态任务和下游依赖
Airflow - Dynamic Tasks and Downstream Dependencies
我在 AWS MWAA v2.2.2
上继承了 运行ning 的以下 dag
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
}
snowflake_conn_id='snowflake'
sources = [
{'name': 'test_1', 'path': 'test1/path/'},
{'name': 'test_2', 'path': 'test2/path'}
]
# define the DAG
with DAG(
'test_dag',
default_args=default_args,
description='test_dag test description.',
schedule_interval=None,
max_active_runs=1
) as dag:
t0 = DummyOperator(task_id = 'start')
for source in sources:
create_table_sql = (
f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
)
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
dag=dag,
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id
)
t1 = DummyOperator(task_id = 'end')
t0 >> external_tables_from_s3 >> t1
设置此 dag 以便 external_tables_from_s3 任务可以 运行 并行
的最佳方法是什么
基本上我想要的是
to >> [create_external_table_for_test_1, create_external_table_for_test_2] >> t1
我想知道在不必单独指定每个任务的情况下实现此目标的最佳方法是什么。源列表比这个大很多,只是针对这个问题进行了缩减
根据您希望如何在 UI 中可视化 DAG,有几个选项:
- 使用列表包含所有
SnowflakeOperator
任务,或
- 使用
TaskGroup
.
选项 1:
from pendulum import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
snowflake_conn_id = "snowflake"
sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]
# define the DAG
with DAG(
"test_dag",
default_args={
"owner": "test",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
},
description="test_dag test description.",
start_date=datetime(2022, 2, 1),
schedule_interval=None,
max_active_runs=1,
) as dag:
t0 = DummyOperator(task_id="start")
snowflake_tasks = []
for source in sources:
create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id,
)
snowflake_tasks.append(external_tables_from_s3)
t1 = DummyOperator(task_id="end")
t0 >> snowflake_tasks >> t1
选项 2:
from pendulum import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.task_group import TaskGroup
snowflake_conn_id = "snowflake"
sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]
# define the DAG
with DAG(
"test_dag",
default_args={
"owner": "test",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
},
description="test_dag test description.",
start_date=datetime(2022, 2, 1),
schedule_interval=None,
max_active_runs=1,
) as dag:
t0 = DummyOperator(task_id="start")
with TaskGroup(group_id="snowflake_tasks") as snowflake_tasks:
for source in sources:
create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id,
)
t1 = DummyOperator(task_id="end")
t0 >> snowflake_tasks >> t1
我在 AWS MWAA v2.2.2
上继承了 运行ning 的以下 dagfrom airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
}
snowflake_conn_id='snowflake'
sources = [
{'name': 'test_1', 'path': 'test1/path/'},
{'name': 'test_2', 'path': 'test2/path'}
]
# define the DAG
with DAG(
'test_dag',
default_args=default_args,
description='test_dag test description.',
schedule_interval=None,
max_active_runs=1
) as dag:
t0 = DummyOperator(task_id = 'start')
for source in sources:
create_table_sql = (
f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
)
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
dag=dag,
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id
)
t1 = DummyOperator(task_id = 'end')
t0 >> external_tables_from_s3 >> t1
设置此 dag 以便 external_tables_from_s3 任务可以 运行 并行
的最佳方法是什么基本上我想要的是
to >> [create_external_table_for_test_1, create_external_table_for_test_2] >> t1
我想知道在不必单独指定每个任务的情况下实现此目标的最佳方法是什么。源列表比这个大很多,只是针对这个问题进行了缩减
根据您希望如何在 UI 中可视化 DAG,有几个选项:
- 使用列表包含所有
SnowflakeOperator
任务,或 - 使用
TaskGroup
.
选项 1:
from pendulum import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
snowflake_conn_id = "snowflake"
sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]
# define the DAG
with DAG(
"test_dag",
default_args={
"owner": "test",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
},
description="test_dag test description.",
start_date=datetime(2022, 2, 1),
schedule_interval=None,
max_active_runs=1,
) as dag:
t0 = DummyOperator(task_id="start")
snowflake_tasks = []
for source in sources:
create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id,
)
snowflake_tasks.append(external_tables_from_s3)
t1 = DummyOperator(task_id="end")
t0 >> snowflake_tasks >> t1
选项 2:
from pendulum import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.task_group import TaskGroup
snowflake_conn_id = "snowflake"
sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]
# define the DAG
with DAG(
"test_dag",
default_args={
"owner": "test",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
},
description="test_dag test description.",
start_date=datetime(2022, 2, 1),
schedule_interval=None,
max_active_runs=1,
) as dag:
t0 = DummyOperator(task_id="start")
with TaskGroup(group_id="snowflake_tasks") as snowflake_tasks:
for source in sources:
create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id,
)
t1 = DummyOperator(task_id="end")
t0 >> snowflake_tasks >> t1