Airflow 能否持续访问短期动态生成任务的元数据?

Can Airflow persist access to metadata of short-lived dynamically generated tasks?

我有一个 DAG,只要 FileSensor 检测到文件,就会为每个文件生成任务,以 (1) 将文件移动到临时区域,(2) 触发一个单独的 DAG 来处理文件。

FileSensor -> Move(File1) -> TriggerDAG(File1) -> Done
          |-> Move(File2) -> TriggerDAG(File2) -^

在 DAG 定义文件中,中间任务是通过遍历 FileSensor 监视的目录生成的,有点像这样:

# def generate_move_task(f: Path) -> BashOperator
# def generate_dag_trigger(f: Path) -> TriggerDagRunOperator

with dag:
  for filepath in Path(WATCH_DIR).glob(*):
    sensor_task >> generate_move_task(filepath) >> generate_dag_trigger(filepath)

Move 任务移动导致任务生成的文件,因此下一个 DAG 运行 不会 FileSensor 重新触发 MoveTriggerDAG 个此文件的任务。 事实上,调度程序根本不会为这个文件生成任务,因为在所有文件都经过 Move 之后,输入目录已经没有内容可以迭代了。 .

这会产生两个问题:

  1. 执行后,任务日志和效果图不再可用。 Graph View 仅显示 DAG 现在(空),而不是 运行时间。 (树视图显示任务的 运行 和状态,但单击“正方形”并选择任何细节都会导致 Airflow 错误。)
  2. 由于竞争条件,下游任务可能会出现内存空洞。 第一个任务是将原始文件移动到临时区域。如果这比调度器轮询周期长,调度器不再收集下游 TriggerDAG(File1) 任务,这意味着即使上游任务 运行 成功,任务也不会被调度执行。就好像下游任务从来没有存在过一样

通过将任务序列更改为 Copy(File1) -> TriggerDAG(File1) -> Remove(File1) 解决了竞争条件问题,但更广泛的问题仍然存在:是否有办法持久化动态生成的任务,或者至少有办法通过 Airflow 接口持续访问它们?

虽然不清楚,但我假设您通过编排器 DAG 触发的下游 DAG 不是为每个文件动态生成的(例如 Move 和 TriggerDAG 任务);换句话说,与不断出现和消失(基于文件)的 Move 任务不同,下游 DAG 是静态的并且始终保持在那里


您已经构建了一个相对复杂的工作流,可以执行动态生成任务和触发外部 DAG 等高级操作。我认为稍微修改一下你的 DAGs 结构,你就可以摆脱你的麻烦(这在我看来也是相当先进的)

  1. Move 任务从上游 orchestrator DAG 转移到下游 (per-file) process DAG
  2. 让上游 orchestrator DAG 做两件事
  3. 感知/等待文件出现
  4. 对于每个文件,触发下游 processing DAG(实际上你已经在做)。

对于 orchestrator DAG,您可以采用任何一种方式

  1. 有一个任务为每个文件执行文件感知 + 触发下游 DAG
  2. 有两个任务(我更喜欢这个)
    • 第一个任务检测文件,当它们出现时,在 XCOM
    • 中发布它们的列表
    • 第二个任务读取 XCOM 和 foreach 文件,触发它对应的 DAG

但无论您选择哪种方式,都必须从

复制相关的代码位

这是描述两个任务方法的图表

对标题问题的简短回答是,从 Airflow 1.10.11 开始,不,这似乎不可能如所述。为了呈现 DAG/task 详细信息,Airflow 网络服务器始终会参考 当前 定义并收集到 DagBag 的 DAG 和任务。如果定义改变或消失,那就倒霉了。仪表板仅显示 table 中的日志条目;它不会探查日志中的先验逻辑(它似乎也不会存储除标题之外的大部分内容)。

y2k-shubham provides an excellent solution to the unspoken question of "how can I write DAGs/tasks so that the transient metadata are accessible"。他的解决方案的潜台词:将瞬态元数据转换为 Airflow 按任务存储的内容 运行,但保持任务本身固定。 XCom 是他在这里使用的解决方案,它确实显示在任务实例详细信息/日志中。

Airflow 是否会实现对定义从 DagBag 中消失的稍纵即逝的 one-time 任务的持久接口访问?有可能但不太可能,原因有二:

  1. 这将需要网络服务器在呈现仪表板时探测历史日志,而不仅仅是当前 DagBag,这将需要额外的基础设施来保持网络界面的快速,并且可能使显示非常混乱。
  2. As y2k-shubham notes in a comment to another question of mine转瞬即逝tasks/DAGs是气流anti-pattern。我想这会使下一个功能很难卖。