如何根据 Airflow 中的不同日期为同一 DAG 设置不同的时间表
How can I set different schedules for the same DAG based on different days in Airflow
我有两个像这样定义的 DAG
my_dag= DAG('my_dag_thu_and_friday',
catchup=False,
default_args=default_args,
schedule_interval='0 12,13,15,19 * * THU,FRI'
)
my_dag= DAG('my_dag_sat_and_sun',
catchup=False,
default_args=default_args,
schedule_interval='0 13,17 * * SAT,SUN'
)
他们 运行 相同的操作员和相同的代码,但根据是 Thu/Fri 还是 Sat/Sun 的时间表不同。有没有一种方法可以指定 cron 间隔,这样我只有一个 DAG 可以有条件地处理调度?
谢谢
目前没有干净简单的解决方案,但是 AIP-39 Richer scheduler_interval 将解决这个问题,因此在未来的 Airflow 版本中这应该会变得容易。
目前您的选择是:
- 尝试使用 cron
'0 12,13,15,17,19 * * THU,FRI,SAT,SUN'
创建 1 个 DAG,放置分支运算符以确定您是否应该执行 DummyOperator
或您的运算符。因此,例如 THU
中 17
的 运行 Airflow 将执行 DummyOperator(因此什么都不做)。
- 继续使用 2 个 DAG。
- 保留 2 个 DAG,但使用 function that returns a DAG object,从而避免维护重复代码的需要。您不必维护重复的代码。
您可以编写一个 returns DAG 对象的函数:
.
def create_dag(dag_id, schedule, default_args):
dag = DAG(
dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
task = BashOperator(task_id='my_task')
return dag
list_of_dags = [
('my_dag_thu_and_friday', '0 12,13,15,19 * * THU,FRI'),
('my_dag_sat_and_sun', '0 13,17 * * SAT,SUN')
]
default_args = {'owner': 'airflow', ...}
for dag_item in list_of_dags:
dag_id = dag_item[0]
dag_schedule = dag_item[1]
globals()[dag_id] = create_dag(
dag_id,
dag_schedule,
default_args
)
我有两个像这样定义的 DAG
my_dag= DAG('my_dag_thu_and_friday',
catchup=False,
default_args=default_args,
schedule_interval='0 12,13,15,19 * * THU,FRI'
)
my_dag= DAG('my_dag_sat_and_sun',
catchup=False,
default_args=default_args,
schedule_interval='0 13,17 * * SAT,SUN'
)
他们 运行 相同的操作员和相同的代码,但根据是 Thu/Fri 还是 Sat/Sun 的时间表不同。有没有一种方法可以指定 cron 间隔,这样我只有一个 DAG 可以有条件地处理调度?
谢谢
目前没有干净简单的解决方案,但是 AIP-39 Richer scheduler_interval 将解决这个问题,因此在未来的 Airflow 版本中这应该会变得容易。
目前您的选择是:
- 尝试使用 cron
'0 12,13,15,17,19 * * THU,FRI,SAT,SUN'
创建 1 个 DAG,放置分支运算符以确定您是否应该执行DummyOperator
或您的运算符。因此,例如THU
中17
的 运行 Airflow 将执行 DummyOperator(因此什么都不做)。 - 继续使用 2 个 DAG。
- 保留 2 个 DAG,但使用 function that returns a DAG object,从而避免维护重复代码的需要。您不必维护重复的代码。 您可以编写一个 returns DAG 对象的函数:
.
def create_dag(dag_id, schedule, default_args):
dag = DAG(
dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
task = BashOperator(task_id='my_task')
return dag
list_of_dags = [
('my_dag_thu_and_friday', '0 12,13,15,19 * * THU,FRI'),
('my_dag_sat_and_sun', '0 13,17 * * SAT,SUN')
]
default_args = {'owner': 'airflow', ...}
for dag_item in list_of_dags:
dag_id = dag_item[0]
dag_schedule = dag_item[1]
globals()[dag_id] = create_dag(
dag_id,
dag_schedule,
default_args
)