Airflow DAG 每天运行(周末除外)?
Airflow DAG Runs Daily Excluding Weekends?
是否可以创建除周六和周日外每天运行的 Airflow DAG?这似乎是不可能的,因为你只有一个 start_date 和一个 schedule_interval.
我正在设置一个每天早上处理一批文件的工作流程。这些文件不会出现在周末,尽管只有周一到周五。我可以简单地使用 24 小时的超时设置,这实际上使周六和周日超时,因为该文件永远不会在那些日子出现,但这会将 DAG 标记为这两天失败,这将是非常愉快的。
'schedule_interval': '0 0 * * 1-5'
在每周一到周五的每一天 00:00 运行。
我有类似的需求,最后把它放在我的 dags 的开头 -- 它类似于 ShortCircuitOperator。
import logging
from airflow.models import SkipMixin, BaseOperator
from airflow.utils.decorators import apply_defaults
class pull_file_decision_operator(BaseOperator, SkipMixin):
template_fields = ('execution_date',)
@apply_defaults
def __init__(self,
day_of_week,
hour_of_day,
execution_date,
**kwargs):
self.day_of_week = day_of_week
self.hour_of_week = hour_of_day
self.execution_date = execution_date
def execute(self, context):
# https://docs.python.org/3/library/datetime.html#datetime.date.weekday
run_dt = self.execution_date
dow = self.day_of_week
hod = self.hour_of_day
if run_dt.weekday() == dow and run_dt.hour == hod:
return True
else:
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
logging.info('Skipping downstream tasks...')
logging.info("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'],
context['ti'].execution_date,
downstream_tasks)
Zack 的答案已经有一个工作日的 cron 时间表,可以满足您的要求 (0 0 * * 1-5
),但我想添加一个带有常见 cron 时间表示例网站的答案,咳咳,crontab 表达式.
我经常将它与 Airflow 结合使用,以得出 DAG 的 schedule_interval
。
帮助您交互式设计 cron 计划的主要应用程序位于 crontab.guru。
仅在工作日安排的示例 - https://crontab.guru/every-weekday
更常见的示例(例如,每半小时、每季度等)- https://crontab.guru/examples.html
是否可以创建除周六和周日外每天运行的 Airflow DAG?这似乎是不可能的,因为你只有一个 start_date 和一个 schedule_interval.
我正在设置一个每天早上处理一批文件的工作流程。这些文件不会出现在周末,尽管只有周一到周五。我可以简单地使用 24 小时的超时设置,这实际上使周六和周日超时,因为该文件永远不会在那些日子出现,但这会将 DAG 标记为这两天失败,这将是非常愉快的。
'schedule_interval': '0 0 * * 1-5'
在每周一到周五的每一天 00:00 运行。
我有类似的需求,最后把它放在我的 dags 的开头 -- 它类似于 ShortCircuitOperator。
import logging
from airflow.models import SkipMixin, BaseOperator
from airflow.utils.decorators import apply_defaults
class pull_file_decision_operator(BaseOperator, SkipMixin):
template_fields = ('execution_date',)
@apply_defaults
def __init__(self,
day_of_week,
hour_of_day,
execution_date,
**kwargs):
self.day_of_week = day_of_week
self.hour_of_week = hour_of_day
self.execution_date = execution_date
def execute(self, context):
# https://docs.python.org/3/library/datetime.html#datetime.date.weekday
run_dt = self.execution_date
dow = self.day_of_week
hod = self.hour_of_day
if run_dt.weekday() == dow and run_dt.hour == hod:
return True
else:
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
logging.info('Skipping downstream tasks...')
logging.info("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'],
context['ti'].execution_date,
downstream_tasks)
Zack 的答案已经有一个工作日的 cron 时间表,可以满足您的要求 (0 0 * * 1-5
),但我想添加一个带有常见 cron 时间表示例网站的答案,咳咳,crontab 表达式.
我经常将它与 Airflow 结合使用,以得出 DAG 的 schedule_interval
。
帮助您交互式设计 cron 计划的主要应用程序位于 crontab.guru。
仅在工作日安排的示例 - https://crontab.guru/every-weekday
更常见的示例(例如,每半小时、每季度等)- https://crontab.guru/examples.html