Airflow - 跨多个文件拆分 DAG 定义

Airflow - Splitting DAG definition across multiple files

刚刚开始使用 Airflow,想知道构建大型 DAG 的最佳实践是什么。对于我们的 ETL,我们有很多属于逻辑分组的任务,但这些组相互依赖。以下哪项被视为最佳做法?

也接受其他建议。

DAG 只是 python 文件。因此,您可以将单个 dag 定义拆分为多个文件。不同的文件应该只包含接收 dag 对象并使用该 dag 对象创建任务的方法。

不过请注意,您应该在全局范围内只有一个 dag 对象。 Airflow 将全局范围内的所有 dag 对象作为单独的 dag。

使每个 dag 尽可能简洁通常被认为是一种很好的做法。但是,如果您需要设置此类依赖项,您可以考虑使用 subdags。更多信息请点击此处:https://airflow.incubator.apache.org/concepts.html?highlight=subdag#scope

您也可以使用 ExternalTask​​Sensor,但请注意,随着 dag 数量的增加,处理任务之间的外部依赖性可能会变得更加困难。我认为子标签可能是适合您的用例的方式。

似乎可以将您的 Python 模块放入 plugins/ 子文件夹并从 DAG 文件导入它们:

https://airflow.apache.org/docs/apache-airflow/stable/plugins.html

随着 TaskGroups in Airflow 2.x, it's worth expanding on 的到来。任务组只是任务的 UI 分组,但它们也可以作为一组相关任务的方便的逻辑分组。 TaskGroup 中的任务可以被捆绑和抽象出来,以便更容易地从更大的部分构建 DAG。话虽如此,拥有一个充满相关任务的文件 而不 将它们捆绑到任务组中可能仍然有用。

分解 DAG 的诀窍是将 DAG 放在一个文件中,例如 my_dag.py,并将任务或任务组的逻辑块放在单独的文件中,每个文件一个逻辑任务块或任务组。每个文件都包含函数(或方法,如果您想采用 OO 方法),每个函数 return 是一个运算符实例或一个 TaskGroup 实例。

为了说明,my_dag.py(下图)从 foo_bar_tasks.py 导入 o​​perator-returning 函数,并从 [=16] 导入 TaskGroup-returning 函数=].在 DAG 上下文中,调用这些函数并将它们的 return 值分配给任务或任务组变量,这些变量可以分配 up-/downstream 依赖项。

dags/my_dag.py:

# std lib imports
 
from airflow import DAG
# other airflow imports
 
from includes.foo_bar_tasks import build_foo_task, build_bar_task
from includes.xyzzy_taskgroup import build_xyzzy_taskgroup
 
with DAG(dag_id="my_dag", ...) as dag:
 
    # logical chunk of tasks
    foo_task = build_foo_task(dag=dag, ...)
    bar_task = build_bar_task(dag=dag, ...)
 
    # taskgroup
    xyzzy_taskgroup = build_xyzzy_taskgroup(dag=dag, ...)
 
    foo_task >> bar_task >> xyzzy_taskgroup

plugins/includes/foo_bar_tasks.py:

# std lib imports
 
from airflow import DAG
from airflow.operators.foo import FooOperator
from airflow.operators.bar import BarOperator
# other airflow imports
 
def build_foo_task(dag: DAG, ...) -> FooOperator:
    # ... logic here ...
    foo_task = FooOperator(..., dag=dag)
 
    return foo_task
 
def build_bar_task(dag: DAG, ...) -> BarOperator:
    # ... logic here ...
    bar_task = BarOperator(..., dag=dag)
 
    return bar_task

plugins/includes/xyzzy_taskgroup.py:

# std lib imports
 
from airflow import DAG
from airflow.operators.baz import BazOperator
from airflow.operators.qux import QuxOperator
from airflow.utils import TaskGroup
# other airflow imports
 
def build_xyzzy_taskgroup(dag: DAG, ...) -> TaskGroup:
    xyzzy_taskgroup = TaskGroup(group_id="xyzzy_taskgroup")
 
    # ... logic here ...
    baz_task = BazOperator(task_id="baz_task", task_group=xyzzy_taskgroup, ...)
 
    # ... logic here ...
    qux_task = QuxOperator(task_id="qux_task", task_group=xyzzy_taskgroup, ...)
 
    baz_task >> qux_task
 
    return xyzzy_taskgroup