Airflow 2.0 中的 DAG 调度

DAG schedule in Airflow 2.0

如何在 Airflow 2.0 中安排 DAG,使其不会 运行 节假日?

问题 1:在每个月的第 5 个工作日运行? 问题 2:在每月的第 5 个工作日运行,如果第 5 天是假期那么它应该 运行 在不是假期的第二天运行?

目前无法做到这一点(至少不是本机)。 Airflow DAG 接受单个 cron 表达式或时间增量。如果你不能用其中之一说出所需的调度逻辑,那么你就不能在 Airflow 中进行这种调度。好消息是 Airflow AIP-39 Richer scheduler_interval 可以解决这个问题,并在未来的版本中提供更多的调度功能。

也就是说,您可以通过将 DAG 设置为 运行 和 schedule_interval="@daily" 并将 BranchPythonOperator 作为 DAG 的第一个任务来解决此问题。在 Python 可调用文件中,您可以编写所需的调度逻辑,这意味着如果是该月的第 5 个工作日,您的函数将 return True 否则将 return False 并且您的工作流将分支因此。对于 True 它将继续执行,对于 False 它将结束。这并不理想,但它会起作用。可能的模板可以是:

def check_fifth():
    #write the logic of finding if today is the 5th working day of the month

    if logic:
        return "continue_task"
    else:
        return "stop_task"

with DAG(dag_id='Whosebug',
         default_args=default_args,
         schedule_interval="@daily",
         catchup=False
         ) as dag:

    start_op = BranchPythonOperator(
        task_id='choose',
        python_callable=check_fifth
    )

    stop_op = DummyOperator(
        task_id='stop_task'
    )

    #replace with the operator that you want to actually execute
    continue_op = YourOperator(
        task_id='continue_task'
    )

    start_op >> [stop_op, continue_op]