设置 Airflow DAG 字段的不同方式
Different way to set Airflow DAG fields
我想创建一个 Airflow DAG 并想了解应该在 field_1
vs default_args
vs args
?
中设置哪些参数
my_dag = DAG(
"my_dag",
"field_1"="xxx",
default_agrs=default_args,
**args
)
我查看了文档,我了解到某些参数如“owner
”必须通过default_args
设置,不能在field_1
中设置。但看起来大多数参数都没有区别。我测试了一些字段,例如“catchup
”和“on_failure_callback
”,它们在这三个地方都有效。
所以我想知道创建 dag 时设置参数的最佳做法是什么?
最佳实践类似于 Airflow 教程
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
...
参考:https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#example-pipeline-definition
但我用类似的东西就够了:
import pendulum
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
with DAG(
default_args=default_args,
dag_id='dag_etl',
catchup=False,
start_date=pendulum.datetime(year=2022, month=1, day=1, tz='America/Chicago'),
schedule_interval='0 8 * * *', # https://crontab.guru/#0_8_*_*_*
description='DAG Extract Transform Load'
) as dag:
...
我想创建一个 Airflow DAG 并想了解应该在 field_1
vs default_args
vs args
?
my_dag = DAG(
"my_dag",
"field_1"="xxx",
default_agrs=default_args,
**args
)
我查看了文档,我了解到某些参数如“owner
”必须通过default_args
设置,不能在field_1
中设置。但看起来大多数参数都没有区别。我测试了一些字段,例如“catchup
”和“on_failure_callback
”,它们在这三个地方都有效。
所以我想知道创建 dag 时设置参数的最佳做法是什么?
最佳实践类似于 Airflow 教程
with DAG(
'tutorial',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
...
参考:https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#example-pipeline-definition
但我用类似的东西就够了:
import pendulum
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
with DAG(
default_args=default_args,
dag_id='dag_etl',
catchup=False,
start_date=pendulum.datetime(year=2022, month=1, day=1, tz='America/Chicago'),
schedule_interval='0 8 * * *', # https://crontab.guru/#0_8_*_*_*
description='DAG Extract Transform Load'
) as dag:
...