如何参数化来自 UI 的气流中的 DAG?
How to paramaterize DAGs in airflow from UI?
上下文:我定义了一个气流 DAG,它根据名为 org
的参数对实体的某些数据执行操作 compute_metrics
。在下面调用 myapi.compute_metrics(org)
之类的东西。此流程将主要 运行 临时进行。
问题:当我从气流 UI 手动触发 DAG 时,我希望能够 select org
到 运行 气流。
我能想到的最直接的解决方案是生成 n
个不同的 DAG,每个组织一个。 DAG 将具有 id
,如:compute_metrics_1
、compute_metrics_2
等...然后当我需要触发单个 org
的计算指标时,我可以选择该组织的 DAG。这不会随着我添加组织和添加更多类型的计算而扩展。
我做了一些研究,似乎我可以为气流创建一个烧瓶蓝图,据我所知,它扩展了 UI。在这个扩展的 UI 中,我可以添加输入组件,如文本框,用于选择一个组织,然后将其作为 conf
传递给由蓝图手动创建的 DagRun
。那是对的吗?我想象我可以写这样的东西:
session = settings.Session()
execution_date = datetime.now()
run_id = 'external_trigger_' + execution_date.isoformat()
trigger = DagRun(
dag_id='general_compute_metrics_needs_org_id',
run_id=run_id,
state=State.RUNNING,
execution_date=execution_date,
external_trigger=True,
conf=org_ui_component.text) # pass the org id from a component in the blueprint
session.add(trigger)
session.commit() # I don't know if this would actually be scheduled by the scheduler
我的想法合理吗?有没有更好的方法来实现我想要的?
I've done some research and it seems that I can create a flask blueprint for airflow, which to my understanding, extends the UI.
蓝图扩展了 API。如果你想要一些 UI ,你需要提供一个模板视图。实现此目标的最 feature-complete 方法是开发您自己的 Airflow Plugin.
如果要手动创建DagRun
s,可以使用this trigger as reference. For simplicity, I'd trigger a Dag with the API.
特别是关于您的问题,我将有一个 DAG compute_metrics
从 Airflow Variable 读取 org
。它们是全局的,可以动态设置。您可以在变量名称前加上 DagRun id 之类的前缀,以使其唯一并因此 dag-concurrent 安全。
上下文:我定义了一个气流 DAG,它根据名为 org
的参数对实体的某些数据执行操作 compute_metrics
。在下面调用 myapi.compute_metrics(org)
之类的东西。此流程将主要 运行 临时进行。
问题:当我从气流 UI 手动触发 DAG 时,我希望能够 select org
到 运行 气流。
我能想到的最直接的解决方案是生成 n
个不同的 DAG,每个组织一个。 DAG 将具有 id
,如:compute_metrics_1
、compute_metrics_2
等...然后当我需要触发单个 org
的计算指标时,我可以选择该组织的 DAG。这不会随着我添加组织和添加更多类型的计算而扩展。
我做了一些研究,似乎我可以为气流创建一个烧瓶蓝图,据我所知,它扩展了 UI。在这个扩展的 UI 中,我可以添加输入组件,如文本框,用于选择一个组织,然后将其作为 conf
传递给由蓝图手动创建的 DagRun
。那是对的吗?我想象我可以写这样的东西:
session = settings.Session() execution_date = datetime.now() run_id = 'external_trigger_' + execution_date.isoformat() trigger = DagRun( dag_id='general_compute_metrics_needs_org_id', run_id=run_id, state=State.RUNNING, execution_date=execution_date, external_trigger=True, conf=org_ui_component.text) # pass the org id from a component in the blueprint session.add(trigger) session.commit() # I don't know if this would actually be scheduled by the scheduler
我的想法合理吗?有没有更好的方法来实现我想要的?
I've done some research and it seems that I can create a flask blueprint for airflow, which to my understanding, extends the UI.
蓝图扩展了 API。如果你想要一些 UI ,你需要提供一个模板视图。实现此目标的最 feature-complete 方法是开发您自己的 Airflow Plugin.
如果要手动创建DagRun
s,可以使用this trigger as reference. For simplicity, I'd trigger a Dag with the API.
特别是关于您的问题,我将有一个 DAG compute_metrics
从 Airflow Variable 读取 org
。它们是全局的,可以动态设置。您可以在变量名称前加上 DagRun id 之类的前缀,以使其唯一并因此 dag-concurrent 安全。