在下一个工作日将每月 DAG 安排到 运行
schedule a monthly DAG to run on the next weekday
我必须在每个月的 15 日安排一个应该 运行 的 DAG。但是,如果 15 号落在 Sunday/Saturday 上,则 DAG 应该跳过周末并在下周一跳过 运行。
例如,2021 年 5 月 15 日是星期六。因此,DAG 应该在 17 日,即星期一 运行,而不是在 5 月 15 日 运行ning。
你能帮忙安排一下airflow吗?
提前致谢!
调度逻辑受限于您可以使用单个 cron 表达式执行的操作。所以如果你不能在 cron 表达式中说出来,你就不能在 Airflow 中提供这样的调度。出于这个原因,有一个开放式气流改进提案 AIP-39 Richer scheduler_interval 以提供更多调度功能。
也就是说,您仍然可以通过编写一些代码来获得所需的功能。
您可以将 dag 设置为在每个月的 15 日开始,然后放置一个传感器来验证日期是否为周一至周五(如果不是,它将等待):
from airflow.sensors.weekday import DayOfWeekSensor
dag = DAG(
dag_id='work',
schedule_interval='0 0 15 * *',
default_args=default_args,
description='Schedule a Job on 15 of each month',
)
weekend_check = DayOfWeekSensor(
task_id='weekday_check_task',
week_day={'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'},
mode='reschedule',
dag=dag)
op_1 = YourOperator(task_id='op1_task',dag=dag)
weekend_check >> op_1
注意:如果您是 运行 airflow<2.0.0
,则需要将导入更改为:
from airflow.contrib.sensors.weekday_sensor import DayOfWeekSensor
Elad 发布的答案非常有效。我想出了另一个同样有效的解决方案。
我将工作安排在本月的 15、16 和 17 日 运行。但是,我添加了一个条件,如果是工作日,工作 运行 会在 15 号完成。工作 运行 如果是星期一,则在 16 号和 17 号。
为此,我添加了一个 BranchPythonOperator:
from airflow.operators.python_operator import BranchPythonOperator
def _conditinal_task_initiator(**kwargs):
execution_date=kwargs['execution_date']
if int(datetime.strftime(execution_date,'%d'))==15 and (execution_date.weekday()<5):
return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==16 and (execution_date.weekday()==0):
return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==17 and (execution_date.weekday()==0):
return 'dummy_task_run_cmo_longit'
else:
return 'dummy_task_skip_cmo_longit'
with DAG(dag_id='NXS_FM_LOAD_CMO_CHOICE_LONGIT',default_args = default_args, schedule_interval = "0 8 15-17 * *", catchup=False) as dag:
conditinal_task_initiator=BranchPythonOperator(
task_id='cond_task_check_day',
provide_context=True,
python_callable=_conditinal_task_initiator,
do_xcom_push=False)
dummy_task_run_cmo_longit=DummyOperator(
task_id='dummy_task_run_cmo_longit')
dummy_task_skip_cmo_longit=DummyOperator(
task_id='dummy_task_skip_cmo_longit')
conditinal_task_initiator >> [dummy_task_run_cmo_longit,dummy_task_skip_cmo_longit]
dummy_task_run_cmo_longit >> <main tasks for execution>
使用它,作业将在每个月的 15、16 和 17 运行。但是,它 运行 每个月只会执行一次功能性任务。
我必须在每个月的 15 日安排一个应该 运行 的 DAG。但是,如果 15 号落在 Sunday/Saturday 上,则 DAG 应该跳过周末并在下周一跳过 运行。
例如,2021 年 5 月 15 日是星期六。因此,DAG 应该在 17 日,即星期一 运行,而不是在 5 月 15 日 运行ning。
你能帮忙安排一下airflow吗?
提前致谢!
调度逻辑受限于您可以使用单个 cron 表达式执行的操作。所以如果你不能在 cron 表达式中说出来,你就不能在 Airflow 中提供这样的调度。出于这个原因,有一个开放式气流改进提案 AIP-39 Richer scheduler_interval 以提供更多调度功能。
也就是说,您仍然可以通过编写一些代码来获得所需的功能。 您可以将 dag 设置为在每个月的 15 日开始,然后放置一个传感器来验证日期是否为周一至周五(如果不是,它将等待):
from airflow.sensors.weekday import DayOfWeekSensor
dag = DAG(
dag_id='work',
schedule_interval='0 0 15 * *',
default_args=default_args,
description='Schedule a Job on 15 of each month',
)
weekend_check = DayOfWeekSensor(
task_id='weekday_check_task',
week_day={'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'},
mode='reschedule',
dag=dag)
op_1 = YourOperator(task_id='op1_task',dag=dag)
weekend_check >> op_1
注意:如果您是 运行 airflow<2.0.0
,则需要将导入更改为:
from airflow.contrib.sensors.weekday_sensor import DayOfWeekSensor
Elad 发布的答案非常有效。我想出了另一个同样有效的解决方案。
我将工作安排在本月的 15、16 和 17 日 运行。但是,我添加了一个条件,如果是工作日,工作 运行 会在 15 号完成。工作 运行 如果是星期一,则在 16 号和 17 号。
为此,我添加了一个 BranchPythonOperator:
from airflow.operators.python_operator import BranchPythonOperator
def _conditinal_task_initiator(**kwargs):
execution_date=kwargs['execution_date']
if int(datetime.strftime(execution_date,'%d'))==15 and (execution_date.weekday()<5):
return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==16 and (execution_date.weekday()==0):
return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==17 and (execution_date.weekday()==0):
return 'dummy_task_run_cmo_longit'
else:
return 'dummy_task_skip_cmo_longit'
with DAG(dag_id='NXS_FM_LOAD_CMO_CHOICE_LONGIT',default_args = default_args, schedule_interval = "0 8 15-17 * *", catchup=False) as dag:
conditinal_task_initiator=BranchPythonOperator(
task_id='cond_task_check_day',
provide_context=True,
python_callable=_conditinal_task_initiator,
do_xcom_push=False)
dummy_task_run_cmo_longit=DummyOperator(
task_id='dummy_task_run_cmo_longit')
dummy_task_skip_cmo_longit=DummyOperator(
task_id='dummy_task_skip_cmo_longit')
conditinal_task_initiator >> [dummy_task_run_cmo_longit,dummy_task_skip_cmo_longit]
dummy_task_run_cmo_longit >> <main tasks for execution>
使用它,作业将在每个月的 15、16 和 17 运行。但是,它 运行 每个月只会执行一次功能性任务。