使用 运行 日期进行每周 Airflow 作业的 eloquent 方法是什么?

What's the eloquent way to use the run date for a weekly Airflow job?

问题:Airflow的execution_date被定义为运行s之间的周期开始。例如,每周计划的 DAG 运行 将在 2018-01-08 T11:00:00 上 运行,但 execution_date 将是 2018-01-01 T11:01:00.

objective:我每周收到一次文件,文件名中有文件日期。为了识别文件,我想使用 Airflow 的 execution_date。但我似乎无法找到一种方法来使用 运行 的日期,而不是使用尽可能早的 execution_date 一段时间。

可能的解决方案

欢迎所有建议或建议。这是一个微妙的问题,但会导致我的 ETL 管道出现一些问题。

另一种可能的解决方案?

  • 每周 运行 一次 DAG "think" 文件将到达。解析着陆区中的文件名,这将为您提供一堆日期。检查并查看这些日期中的哪些日期在 execution_date + schedule_interval 之间(如果您使用的是 >= 1.8 的气流版本,则为 next_execution_date )。然后摄取匹配的file/s。

我认为使用 execution_date + timedelta(days=7) 有点 hacky,intead 使用 execution_date + schedule_interval,如果间隔发生变化,那不应该任何问题(我为我的一个 DAGS 这样做)。如果您使用的是较新的气流版本,那么您可以使用更好的 next_execution_date。

我正在使用宏来解决这个问题。

此函数(用于宏)也可以处理手动触发。

def weekly_today(execution_date, run_id, years=0, months=0, days=0, fmt="%Y%m%d"):
    d = pendulum.instance(execution_date)
    if run_id.startswith('scheduled_'):
        d = d.add(days=7)
    return d.add(years=years, months=months, days=days).strftime(fmt)

这个函数应该作为 user_defined_macros

添加到 DAG
dag = DAG(
    dag_id='test',
    start_date=timezone.datetime(2019, 6, 24, 6),
    schedule_interval=timedelta(days=7),
    user_defined_macros={
        'weekly_today': weekly_today
    },
)

而且我需要设置从1年前到今天的数据范围。 这是示例宏用法。

from_macro = '{{ weekly_today(execution_date, run_id, years=-1) }}'
to_macro = '{{ weekly_today(execution_date, run_id) }}'

错误的命名..但有效。