在下一个工作日将每月 DAG 安排到 运行

schedule a monthly DAG to run on the next weekday

我必须在每个月的 15 日安排一个应该 运行 的 DAG。但是,如果 15 号落在 Sunday/Saturday 上,则 DAG 应该跳过周末并在下周一跳过 运行。

例如,2021 年 5 月 15 日是星期六。因此,DAG 应该在 17 日,即星期一 运行,而不是在 5 月 15 日 运行ning。

你能帮忙安排一下airflow吗?

提前致谢!

调度逻辑受限于您可以使用单个 cron 表达式执行的操作。所以如果你不能在 cron 表达式中说出来,你就不能在 Airflow 中提供这样的调度。出于这个原因,有一个开放式气流改进提案 AIP-39 Richer scheduler_interval 以提供更多调度功能。

也就是说,您仍然可以通过编写一些代码来获得所需的功能。 您可以将 dag 设置为在每个月的 15 日开始,然后放置一个传感器来验证日期是否为周一至周五(如果不是,它将等待):

from airflow.sensors.weekday import DayOfWeekSensor
dag = DAG(
    dag_id='work',
    schedule_interval='0 0 15 * *',
    default_args=default_args,
    description='Schedule a Job on 15 of each month',
)

weekend_check = DayOfWeekSensor(
    task_id='weekday_check_task',
    week_day={'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'},
    mode='reschedule',
    dag=dag)

op_1 = YourOperator(task_id='op1_task',dag=dag)

weekend_check >> op_1

注意:如果您是 运行 airflow<2.0.0,则需要将导入更改为:

from airflow.contrib.sensors.weekday_sensor import DayOfWeekSensor

Elad 发布的答案非常有效。我想出了另一个同样有效的解决方案。

我将工作安排在本月的 15、16 和 17 日 运行。但是,我添加了一个条件,如果是工作日,工作 运行 会在 15 号完成。工作 运行 如果是星期一,则在 16 号和 17 号。

为此,我添加了一个 BranchPythonOperator:

from airflow.operators.python_operator import BranchPythonOperator

def _conditinal_task_initiator(**kwargs):
execution_date=kwargs['execution_date']
if int(datetime.strftime(execution_date,'%d'))==15 and (execution_date.weekday()<5):
    return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==16 and (execution_date.weekday()==0):
    return 'dummy_task_run_cmo_longit'
elif int(datetime.strftime(execution_date,'%d'))==17 and (execution_date.weekday()==0):
    return 'dummy_task_run_cmo_longit'
else:
    return 'dummy_task_skip_cmo_longit'

with DAG(dag_id='NXS_FM_LOAD_CMO_CHOICE_LONGIT',default_args = default_args, schedule_interval = "0 8 15-17 * *", catchup=False) as dag:
conditinal_task_initiator=BranchPythonOperator(
    task_id='cond_task_check_day',
    provide_context=True,
    python_callable=_conditinal_task_initiator,
    do_xcom_push=False)
dummy_task_run_cmo_longit=DummyOperator(
    task_id='dummy_task_run_cmo_longit')
dummy_task_skip_cmo_longit=DummyOperator(
    task_id='dummy_task_skip_cmo_longit')

conditinal_task_initiator >> [dummy_task_run_cmo_longit,dummy_task_skip_cmo_longit]

dummy_task_run_cmo_longit >> <main tasks for execution>

使用它,作业将在每个月的 15、16 和 17 运行。但是,它 运行 每个月只会执行一次功能性任务。