Airflow DAG 每天运行(周末除外)?

Airflow DAG Runs Daily Excluding Weekends?

是否可以创建除周六和周日外每天运行的 Airflow DAG?这似乎是不可能的,因为你只有一个 start_date 和一个 schedule_interval.

我正在设置一个每天早上处理一批文件的工作流程。这些文件不会出现在周末,尽管只有周一到周五。我可以简单地使用 24 小时的超时设置,这实际上使周六和周日超时,因为该文件永远不会在那些日子出现,但这会将 DAG 标记为这两天失败,这将是非常愉快的。

'schedule_interval': '0 0 * * 1-5' 在每周一到周五的每一天 00:00 运行。

我有类似的需求,最后把它放在我的 dags 的开头 -- 它类似于 ShortCircuitOperator。

import logging
from airflow.models import SkipMixin, BaseOperator
from airflow.utils.decorators import apply_defaults


class pull_file_decision_operator(BaseOperator, SkipMixin):

   template_fields = ('execution_date',)

   @apply_defaults
   def __init__(self,
                day_of_week, 
                hour_of_day,
                execution_date,
                **kwargs):

       self.day_of_week = day_of_week
       self.hour_of_week = hour_of_day
       self.execution_date = execution_date

   def execute(self, context):
       # https://docs.python.org/3/library/datetime.html#datetime.date.weekday

       run_dt = self.execution_date
       dow = self.day_of_week
       hod = self.hour_of_day

       if run_dt.weekday() == dow and run_dt.hour == hod:
          return True
       else:
          downstream_tasks = context['task'].get_flat_relatives(upstream=False)
          logging.info('Skipping downstream tasks...')
          logging.info("Downstream task_ids %s", downstream_tasks)

          if downstream_tasks:
             self.skip(context['dag_run'],
                       context['ti'].execution_date,
                       downstream_tasks)

Zack 的答案已经有一个工作日的 cron 时间表,可以满足您的要求 (0 0 * * 1-5),但我想添加一个带有常见 cron 时间表示例网站的答案,咳咳,crontab 表达式.

我经常将它与 Airflow 结合使用,以得出 DAG 的 schedule_interval

帮助您交互式设计 cron 计划的主要应用程序位于 crontab.guru

仅在工作日安排的示例 - https://crontab.guru/every-weekday

更常见的示例(例如,每半小时、每季度等)- https://crontab.guru/examples.html