Airflow 能否持续访问短期动态生成任务的元数据?
Can Airflow persist access to metadata of short-lived dynamically generated tasks?
我有一个 DAG,只要 FileSensor
检测到文件,就会为每个文件生成任务,以 (1) 将文件移动到临时区域,(2) 触发一个单独的 DAG 来处理文件。
FileSensor -> Move(File1) -> TriggerDAG(File1) -> Done
|-> Move(File2) -> TriggerDAG(File2) -^
在 DAG 定义文件中,中间任务是通过遍历 FileSensor 监视的目录生成的,有点像这样:
# def generate_move_task(f: Path) -> BashOperator
# def generate_dag_trigger(f: Path) -> TriggerDagRunOperator
with dag:
for filepath in Path(WATCH_DIR).glob(*):
sensor_task >> generate_move_task(filepath) >> generate_dag_trigger(filepath)
Move
任务移动导致任务生成的文件,因此下一个 DAG 运行 不会 FileSensor
重新触发 Move
或TriggerDAG
个此文件的任务。 事实上,调度程序根本不会为这个文件生成任务,因为在所有文件都经过 Move
之后,输入目录已经没有内容可以迭代了。 .
这会产生两个问题:
- 执行后,任务日志和效果图不再可用。 Graph View 仅显示 DAG 现在(空),而不是 运行时间。 (树视图显示任务的 运行 和状态,但单击“正方形”并选择任何细节都会导致 Airflow 错误。)
- 由于竞争条件,下游任务可能会出现内存空洞。 第一个任务是将原始文件移动到临时区域。如果这比调度器轮询周期长,调度器不再收集下游
TriggerDAG(File1)
任务,这意味着即使上游任务 运行 成功,任务也不会被调度执行。就好像下游任务从来没有存在过一样
通过将任务序列更改为 Copy(File1) -> TriggerDAG(File1) -> Remove(File1)
解决了竞争条件问题,但更广泛的问题仍然存在:是否有办法持久化动态生成的任务,或者至少有办法通过 Airflow 接口持续访问它们?
虽然不清楚,但我假设您通过编排器 DAG 触发的下游 DAG 不是为每个文件动态生成的(例如 Move 和 TriggerDAG 任务);换句话说,与不断出现和消失(基于文件)的 Move 任务不同,下游 DAG 是静态的并且始终保持在那里
您已经构建了一个相对复杂的工作流,可以执行动态生成任务和触发外部 DAG 等高级操作。我认为稍微修改一下你的 DAGs 结构,你就可以摆脱你的麻烦(这在我看来也是相当先进的)
- 将
Move
任务从上游 orchestrator DAG 转移到下游 (per-file) process DAG
- 让上游 orchestrator DAG 做两件事
- 感知/等待文件出现
- 对于每个文件,触发下游 processing DAG(实际上你已经在做)。
对于 orchestrator DAG,您可以采用任何一种方式
- 有一个任务为每个文件执行文件感知 + 触发下游 DAG
- 有两个任务(我更喜欢这个)
- 第一个任务检测文件,当它们出现时,在 XCOM
中发布它们的列表
- 第二个任务读取 XCOM 和 foreach 文件,触发它对应的 DAG
但无论您选择哪种方式,都必须从
复制相关的代码位
FileSensor
(为了能够感知文件然后在XCOM
中公布他们的名字)和
TriggerDagRunOperator
(以便能够用一个任务触发多个DAG)
这是描述两个任务方法的图表
对标题问题的简短回答是,从 Airflow 1.10.11 开始,不,这似乎不可能如所述。为了呈现 DAG/task 详细信息,Airflow 网络服务器始终会参考 当前 定义并收集到 DagBag
的 DAG 和任务。如果定义改变或消失,那就倒霉了。仪表板仅显示 table 中的日志条目;它不会探查日志中的先验逻辑(它似乎也不会存储除标题之外的大部分内容)。
y2k-shubham provides an excellent solution to the unspoken question of "how can I write DAGs/tasks so that the transient metadata are accessible"。他的解决方案的潜台词:将瞬态元数据转换为 Airflow 按任务存储的内容 运行,但保持任务本身固定。 XCom 是他在这里使用的解决方案,它确实显示在任务实例详细信息/日志中。
Airflow 是否会实现对定义从 DagBag
中消失的稍纵即逝的 one-time 任务的持久接口访问?有可能但不太可能,原因有二:
- 这将需要网络服务器在呈现仪表板时探测历史日志,而不仅仅是当前
DagBag
,这将需要额外的基础设施来保持网络界面的快速,并且可能使显示非常混乱。
- As y2k-shubham notes in a comment to another question of mine,转瞬即逝tasks/DAGs是气流anti-pattern。我想这会使下一个功能很难卖。
我有一个 DAG,只要 FileSensor
检测到文件,就会为每个文件生成任务,以 (1) 将文件移动到临时区域,(2) 触发一个单独的 DAG 来处理文件。
FileSensor -> Move(File1) -> TriggerDAG(File1) -> Done
|-> Move(File2) -> TriggerDAG(File2) -^
在 DAG 定义文件中,中间任务是通过遍历 FileSensor 监视的目录生成的,有点像这样:
# def generate_move_task(f: Path) -> BashOperator
# def generate_dag_trigger(f: Path) -> TriggerDagRunOperator
with dag:
for filepath in Path(WATCH_DIR).glob(*):
sensor_task >> generate_move_task(filepath) >> generate_dag_trigger(filepath)
Move
任务移动导致任务生成的文件,因此下一个 DAG 运行 不会 FileSensor
重新触发 Move
或TriggerDAG
个此文件的任务。 事实上,调度程序根本不会为这个文件生成任务,因为在所有文件都经过 Move
之后,输入目录已经没有内容可以迭代了。 .
这会产生两个问题:
- 执行后,任务日志和效果图不再可用。 Graph View 仅显示 DAG 现在(空),而不是 运行时间。 (树视图显示任务的 运行 和状态,但单击“正方形”并选择任何细节都会导致 Airflow 错误。)
- 由于竞争条件,下游任务可能会出现内存空洞。 第一个任务是将原始文件移动到临时区域。如果这比调度器轮询周期长,调度器不再收集下游
TriggerDAG(File1)
任务,这意味着即使上游任务 运行 成功,任务也不会被调度执行。就好像下游任务从来没有存在过一样
通过将任务序列更改为 Copy(File1) -> TriggerDAG(File1) -> Remove(File1)
解决了竞争条件问题,但更广泛的问题仍然存在:是否有办法持久化动态生成的任务,或者至少有办法通过 Airflow 接口持续访问它们?
虽然不清楚,但我假设您通过编排器 DAG 触发的下游 DAG 不是为每个文件动态生成的(例如 Move 和 TriggerDAG 任务);换句话说,与不断出现和消失(基于文件)的 Move 任务不同,下游 DAG 是静态的并且始终保持在那里
您已经构建了一个相对复杂的工作流,可以执行动态生成任务和触发外部 DAG 等高级操作。我认为稍微修改一下你的 DAGs 结构,你就可以摆脱你的麻烦(这在我看来也是相当先进的)
- 将
Move
任务从上游 orchestrator DAG 转移到下游 (per-file) process DAG - 让上游 orchestrator DAG 做两件事
- 感知/等待文件出现
- 对于每个文件,触发下游 processing DAG(实际上你已经在做)。
对于 orchestrator DAG,您可以采用任何一种方式
- 有一个任务为每个文件执行文件感知 + 触发下游 DAG
- 有两个任务(我更喜欢这个)
- 第一个任务检测文件,当它们出现时,在 XCOM 中发布它们的列表
- 第二个任务读取 XCOM 和 foreach 文件,触发它对应的 DAG
但无论您选择哪种方式,都必须从
复制相关的代码位FileSensor
(为了能够感知文件然后在XCOM
中公布他们的名字)和TriggerDagRunOperator
(以便能够用一个任务触发多个DAG)
这是描述两个任务方法的图表
对标题问题的简短回答是,从 Airflow 1.10.11 开始,不,这似乎不可能如所述。为了呈现 DAG/task 详细信息,Airflow 网络服务器始终会参考 当前 定义并收集到 DagBag
的 DAG 和任务。如果定义改变或消失,那就倒霉了。仪表板仅显示 table 中的日志条目;它不会探查日志中的先验逻辑(它似乎也不会存储除标题之外的大部分内容)。
y2k-shubham provides an excellent solution to the unspoken question of "how can I write DAGs/tasks so that the transient metadata are accessible"。他的解决方案的潜台词:将瞬态元数据转换为 Airflow 按任务存储的内容 运行,但保持任务本身固定。 XCom 是他在这里使用的解决方案,它确实显示在任务实例详细信息/日志中。
Airflow 是否会实现对定义从 DagBag
中消失的稍纵即逝的 one-time 任务的持久接口访问?有可能但不太可能,原因有二:
- 这将需要网络服务器在呈现仪表板时探测历史日志,而不仅仅是当前
DagBag
,这将需要额外的基础设施来保持网络界面的快速,并且可能使显示非常混乱。 - As y2k-shubham notes in a comment to another question of mine,转瞬即逝tasks/DAGs是气流anti-pattern。我想这会使下一个功能很难卖。