如何根据 Airflow 中的不同日期为同一 DAG 设置不同的时间表

How can I set different schedules for the same DAG based on different days in Airflow

我有两个像这样定义的 DAG

my_dag= DAG('my_dag_thu_and_friday',
           catchup=False,
           default_args=default_args,
           schedule_interval='0 12,13,15,19 * * THU,FRI'
         ) 
my_dag= DAG('my_dag_sat_and_sun',
           catchup=False,
           default_args=default_args,
           schedule_interval='0 13,17 * * SAT,SUN'
         ) 

他们 运行 相同的操作员和相同的代码,但根据是 Thu/Fri 还是 Sat/Sun 的时间表不同。有没有一种方法可以指定 cron 间隔,这样我只有一个 DAG 可以有条件地处理调度?

谢谢

目前没有干净简单的解决方案,但是 AIP-39 Richer scheduler_interval 将解决这个问题,因此在未来的 Airflow 版本中这应该会变得容易。

目前您的选择是:

  1. 尝试使用 cron '0 12,13,15,17,19 * * THU,FRI,SAT,SUN' 创建 1 个 DAG,放置分支运算符以确定您是否应该执行 DummyOperator 或您的运算符。因此,例如 THU17 的 运行 Airflow 将执行 DummyOperator(因此什么都不做)。
  2. 继续使用 2 个 DAG。
  3. 保留 2 个 DAG,但使用 function that returns a DAG object,从而避免维护重复代码的需要。您不必维护重复的代码。 您可以编写一个 returns DAG 对象的函数:

.

def create_dag(dag_id, schedule, default_args):
    dag = DAG(
        dag_id,
        schedule_interval=schedule,
        default_args=default_args)
    with dag:
        task = BashOperator(task_id='my_task')
    return dag

list_of_dags = [
    ('my_dag_thu_and_friday', '0 12,13,15,19 * * THU,FRI'),
    ('my_dag_sat_and_sun', '0 13,17 * * SAT,SUN')
]

default_args = {'owner': 'airflow', ...}

for dag_item in list_of_dags:
    dag_id = dag_item[0]
    dag_schedule = dag_item[1]
    globals()[dag_id] = create_dag(
        dag_id,
        dag_schedule,
        default_args
    )