如何参数化来自 UI 的气流中的 DAG?

How to paramaterize DAGs in airflow from UI?

上下文:我定义了一个气流 DAG,它根据名为 org 的参数对实体的某些数据执行操作 compute_metrics。在下面调用 myapi.compute_metrics(org) 之类的东西。此流程将主要 运行 临时进行。

问题:当我从气流 UI 手动触发 DAG 时,我希望能够 select org 到 运行 气流。

我能想到的最直接的解决方案是生成 n 个不同的 DAG,每个组织一个。 DAG 将具有 id,如:compute_metrics_1compute_metrics_2 等...然后当我需要触发单个 org 的计算指标时,我可以选择该组织的 DAG。这不会随着我添加组织和添加更多类型的计算而扩展。

我做了一些研究,似乎我可以为气流创建一个烧瓶蓝图,据我所知,它扩展了 UI。在这个扩展的 UI 中,我可以添加输入组件,如文本框,用于选择一个组织,然后将其作为 conf 传递给由蓝图手动创建的 DagRun。那是对的吗?我想象我可以写这样的东西:

session = settings.Session()

execution_date = datetime.now()
run_id = 'external_trigger_' + execution_date.isoformat()

trigger = DagRun(
    dag_id='general_compute_metrics_needs_org_id',
    run_id=run_id,
    state=State.RUNNING,
    execution_date=execution_date,
    external_trigger=True,
    conf=org_ui_component.text) # pass the org id from a component in the blueprint
session.add(trigger)
session.commit() # I don't know if this would actually be scheduled by the scheduler

我的想法合理吗?有没有更好的方法来实现我想要的?

I've done some research and it seems that I can create a flask blueprint for airflow, which to my understanding, extends the UI.

蓝图扩展了 API。如果你想要一些 UI ,你需要提供一个模板视图。实现此目标的最 feature-complete 方法是开发您自己的 Airflow Plugin.

如果要手动创建DagRuns,可以使用this trigger as reference. For simplicity, I'd trigger a Dag with the API.

特别是关于您的问题,我将有一个 DAG compute_metrics 从 Airflow Variable 读取 org。它们是全局的,可以动态设置。您可以在变量名称前加上 DagRun id 之类的前缀,以使其唯一并因此 dag-concurrent 安全。