如何制作一个需要处理今天数据的DAG?

How to make a DAG that needs to process the data from today?

我有一个每天从 7:30pm 开始的 DAG。它需要处理位于 /data/yyyy-mm-dd/ 目录中的文件。 yyyy-mm-dd 是同一天。

如果我使用 execution_date + timedelta(day=1) 它在 DAG 被调度程序 运行 时工作。但是当我使用回填命令时这会中断(我必须将 2019-01-01 给它 运行 用于 2019-01-02)

有没有更好的方法来完成这个?

您的问题听起来对 execution_date 回填有点困惑。回填命令要求您为 运行 中的 DAG 指定备用开始和结束日期。然后它使用 schedule_interval 计算出 运行 中的 运行该范围并传递给他们 execution_date.

所以,你的 schedule_interval 可能看起来像 30 19 * * *。正如您所知,您的 运行 在该间隔结束时通过了间隔的开始,因此 2019-01-01T19:30:00.000 的计划 execution_date 将被触发在 2019- 之后开始01-02T19:30:00.000。那时您似乎希望作业获取 /data/2019-01-02/ 中的数据,这就是为什么要向 execution_date 添加一天并将其格式化为源的原因。

如果您正在 backfilling,它应该以相同的方式运行(而不是改变时间)。因此,给定 -s 2019-01-01 -e 2019-01-02 它将回填一个 运行,该 运行 将在 2019-01-02T19:30:00.000 之后触发,执行日期为 2019-01-01T19:30:00.000,不是吗?

至于其他方法:

  • 您可以将您的 运行 移至午夜,并让他们使用 execution_date 中的日期。但是 4.5 小时的延迟可能不是您想要的。
  • 你看看数据目录是否可以不同命名,我怀疑如果有其他人或工作依赖它们,那会没问题。
  • Airflow 也有一个 next_execution_date,这基本上会给您带来与 execution_date 增加一天相同的结果。但是您可能会喜欢格式化的 macro {{ next_ds }} 来满足您的需要。