气流DAG调度月的最后一天-n天
Airflow DAG Scheduling last day of month -n days
我想将我的 dag 安排在每月最后一天的前 3 天 运行,因此对于 2 月,我的 dag 应该 运行 在 25,而对于 3 月,dag 应该 运行第28天。关于如何安排这个的任何想法?
谢谢
对于气流 >= 2.2.0:
AIP-39 Richer scheduler_interval is available. You can define your own Timetable for the scheduling. There is How to guide customize DAG scheduling with Timetables。您需要通过插件注册时间表并定义调度逻辑。
对于 Airflow < 2.2.0:
只有当您可以在单个 cron 表达式中“说出来”时,您才能安排 DAG。如果您的日程安排愿望不适合 cron 表达式,那么您不能开箱即用。但是,您可以找到一个足够接近您希望的 cron 表达式(0 0 25-31 * *
- 从 28 到 31 的每一天)并在您的 DAG 的开头放置一个 ShortCircuitOperator
验证日期是否实际上是月底前 3 天。如果日期匹配则继续执行下游任务如果日期不匹配则跳过下游任务:
import calendar
from datetime import datetime, date, timedelta
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 8, 21)
}
def check_if_last_day_of_month(execution_date):
# calendar.monthrange return a tuple (weekday of first day of the
# month, number
# of days in month)
run_date = datetime.fromtimestamp(execution_date.timestamp())
last_day_of_month = calendar.monthrange(run_date.year, run_date.month)[1]
# check if date is 3 days behind the last day of the month
if run_date == date(run_date.year, run_date.month, last_day_of_month) - timedelta(days=3):
return True
return False
with DAG(
dag_id='short_example',
schedule_interval="@once",
default_args=default_args,
) as dag:
first = ShortCircuitOperator(
task_id='verify_date',
python_callable=check_if_last_day_of_month
)
second = DummyOperator(task_id='task')
first >> second
示例 运行 2021-01-30
:
示例 运行 2021-01-28
:
注意:确保您比较的是您感兴趣的日期。在示例中,我比较了 DAG 的 execution_date
。
我想将我的 dag 安排在每月最后一天的前 3 天 运行,因此对于 2 月,我的 dag 应该 运行 在 25,而对于 3 月,dag 应该 运行第28天。关于如何安排这个的任何想法?
谢谢
对于气流 >= 2.2.0:
AIP-39 Richer scheduler_interval is available. You can define your own Timetable for the scheduling. There is How to guide customize DAG scheduling with Timetables。您需要通过插件注册时间表并定义调度逻辑。
对于 Airflow < 2.2.0:
只有当您可以在单个 cron 表达式中“说出来”时,您才能安排 DAG。如果您的日程安排愿望不适合 cron 表达式,那么您不能开箱即用。但是,您可以找到一个足够接近您希望的 cron 表达式(0 0 25-31 * *
- 从 28 到 31 的每一天)并在您的 DAG 的开头放置一个 ShortCircuitOperator
验证日期是否实际上是月底前 3 天。如果日期匹配则继续执行下游任务如果日期不匹配则跳过下游任务:
import calendar
from datetime import datetime, date, timedelta
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 8, 21)
}
def check_if_last_day_of_month(execution_date):
# calendar.monthrange return a tuple (weekday of first day of the
# month, number
# of days in month)
run_date = datetime.fromtimestamp(execution_date.timestamp())
last_day_of_month = calendar.monthrange(run_date.year, run_date.month)[1]
# check if date is 3 days behind the last day of the month
if run_date == date(run_date.year, run_date.month, last_day_of_month) - timedelta(days=3):
return True
return False
with DAG(
dag_id='short_example',
schedule_interval="@once",
default_args=default_args,
) as dag:
first = ShortCircuitOperator(
task_id='verify_date',
python_callable=check_if_last_day_of_month
)
second = DummyOperator(task_id='task')
first >> second
示例 运行 2021-01-30
:
示例 运行 2021-01-28
:
注意:确保您比较的是您感兴趣的日期。在示例中,我比较了 DAG 的 execution_date
。