使用 运行 日期进行每周 Airflow 作业的 eloquent 方法是什么?
What's the eloquent way to use the run date for a weekly Airflow job?
问题:Airflow的execution_date
被定义为运行s之间的周期开始。例如,每周计划的 DAG 运行 将在 2018-01-08 T11:00:00 上 运行,但 execution_date
将是 2018-01-01 T11:01:00.
objective:我每周收到一次文件,文件名中有文件日期。为了识别文件,我想使用 Airflow 的 execution_date
。但我似乎无法找到一种方法来使用 运行 的日期,而不是使用尽可能早的 execution_date
一段时间。
可能的解决方案:
- 即时修改
execution_date
。类似于:context['execution_date'] + timedelta(days=7)
。这看起来很老套。
- 运行每日DAG,在DAG执行图的开头插入一个
ShortCircuitOperator
,如果execution_date
不是预期日期则退出
欢迎所有建议或建议。这是一个微妙的问题,但会导致我的 ETL 管道出现一些问题。
另一种可能的解决方案?
- 每周 运行 一次 DAG "think" 文件将到达。解析着陆区中的文件名,这将为您提供一堆日期。检查并查看这些日期中的哪些日期在 execution_date + schedule_interval 之间(如果您使用的是 >= 1.8 的气流版本,则为 next_execution_date )。然后摄取匹配的file/s。
我认为使用 execution_date + timedelta(days=7) 有点 hacky,intead 使用 execution_date + schedule_interval,如果间隔发生变化,那不应该任何问题(我为我的一个 DAGS 这样做)。如果您使用的是较新的气流版本,那么您可以使用更好的 next_execution_date。
我正在使用宏来解决这个问题。
此函数(用于宏)也可以处理手动触发。
def weekly_today(execution_date, run_id, years=0, months=0, days=0, fmt="%Y%m%d"):
d = pendulum.instance(execution_date)
if run_id.startswith('scheduled_'):
d = d.add(days=7)
return d.add(years=years, months=months, days=days).strftime(fmt)
这个函数应该作为 user_defined_macros
添加到 DAG
dag = DAG(
dag_id='test',
start_date=timezone.datetime(2019, 6, 24, 6),
schedule_interval=timedelta(days=7),
user_defined_macros={
'weekly_today': weekly_today
},
)
而且我需要设置从1年前到今天的数据范围。
这是示例宏用法。
from_macro = '{{ weekly_today(execution_date, run_id, years=-1) }}'
to_macro = '{{ weekly_today(execution_date, run_id) }}'
错误的命名..但有效。
问题:Airflow的execution_date
被定义为运行s之间的周期开始。例如,每周计划的 DAG 运行 将在 2018-01-08 T11:00:00 上 运行,但 execution_date
将是 2018-01-01 T11:01:00.
objective:我每周收到一次文件,文件名中有文件日期。为了识别文件,我想使用 Airflow 的 execution_date
。但我似乎无法找到一种方法来使用 运行 的日期,而不是使用尽可能早的 execution_date
一段时间。
可能的解决方案:
- 即时修改
execution_date
。类似于:context['execution_date'] + timedelta(days=7)
。这看起来很老套。 - 运行每日DAG,在DAG执行图的开头插入一个
ShortCircuitOperator
,如果execution_date
不是预期日期则退出
欢迎所有建议或建议。这是一个微妙的问题,但会导致我的 ETL 管道出现一些问题。
另一种可能的解决方案?
- 每周 运行 一次 DAG "think" 文件将到达。解析着陆区中的文件名,这将为您提供一堆日期。检查并查看这些日期中的哪些日期在 execution_date + schedule_interval 之间(如果您使用的是 >= 1.8 的气流版本,则为 next_execution_date )。然后摄取匹配的file/s。
我认为使用 execution_date + timedelta(days=7) 有点 hacky,intead 使用 execution_date + schedule_interval,如果间隔发生变化,那不应该任何问题(我为我的一个 DAGS 这样做)。如果您使用的是较新的气流版本,那么您可以使用更好的 next_execution_date。
我正在使用宏来解决这个问题。
此函数(用于宏)也可以处理手动触发。
def weekly_today(execution_date, run_id, years=0, months=0, days=0, fmt="%Y%m%d"):
d = pendulum.instance(execution_date)
if run_id.startswith('scheduled_'):
d = d.add(days=7)
return d.add(years=years, months=months, days=days).strftime(fmt)
这个函数应该作为 user_defined_macros
dag = DAG(
dag_id='test',
start_date=timezone.datetime(2019, 6, 24, 6),
schedule_interval=timedelta(days=7),
user_defined_macros={
'weekly_today': weekly_today
},
)
而且我需要设置从1年前到今天的数据范围。 这是示例宏用法。
from_macro = '{{ weekly_today(execution_date, run_id, years=-1) }}'
to_macro = '{{ weekly_today(execution_date, run_id) }}'
错误的命名..但有效。