气流DAG调度月的最后一天-n天

Airflow DAG Scheduling last day of month -n days

我想将我的 dag 安排在每月最后一天的前 3 天 运行,因此对于 2 月,我的 dag 应该 运行 在 25,而对于 3 月,dag 应该 运行第28天。关于如何安排这个的任何想法?

谢谢

对于气流 >= 2.2.0:

AIP-39 Richer scheduler_interval is available. You can define your own Timetable for the scheduling. There is How to guide customize DAG scheduling with Timetables。您需要通过插件注册时间表并定义调度逻辑。

对于 Airflow < 2.2.0:

只有当您可以在单个 cron 表达式中“说出来”时,您才能安排 DAG。如果您的日程安排愿望不适合 cron 表达式,那么您不能开箱即用。但是,您可以找到一个足够接近您希望的 cron 表达式(0 0 25-31 * * - 从 28 到 31 的每一天)并在您的 DAG 的开头放置一个 ShortCircuitOperator验证日期是否实际上是月底前 3 天。如果日期匹配则继续执行下游任务如果日期不匹配则跳过下游任务:

import calendar
from datetime import datetime, date, timedelta

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 8, 21)

}


def check_if_last_day_of_month(execution_date):
    #  calendar.monthrange return a tuple (weekday of first day of the
    #  month, number
    #  of days in month)
    run_date = datetime.fromtimestamp(execution_date.timestamp())
    last_day_of_month = calendar.monthrange(run_date.year, run_date.month)[1]
    # check if date is 3 days behind the last day of the month
    if run_date == date(run_date.year, run_date.month, last_day_of_month) - timedelta(days=3):
        return True
    return False


with DAG(
    dag_id='short_example',
    schedule_interval="@once",
    default_args=default_args,
) as dag:
    first = ShortCircuitOperator(
        task_id='verify_date',
        python_callable=check_if_last_day_of_month
    )

    second = DummyOperator(task_id='task')

    first >> second

示例 运行 2021-01-30:

示例 运行 2021-01-28:

注意:确保您比较的是您感兴趣的日期。在示例中,我比较了 DAG 的 execution_date