Airflow - 跨多个文件拆分 DAG 定义
Airflow - Splitting DAG definition across multiple files
刚刚开始使用 Airflow,想知道构建大型 DAG 的最佳实践是什么。对于我们的 ETL,我们有很多属于逻辑分组的任务,但这些组相互依赖。以下哪项被视为最佳做法?
- 一个包含所有任务的大型 DAG 文件
- 跨多个文件拆分 DAG 定义(如何操作?)
- 定义多个 DAG,每组任务一个,并使用 ExternalTaskSensor 设置它们之间的依赖关系
也接受其他建议。
DAG 只是 python 文件。因此,您可以将单个 dag 定义拆分为多个文件。不同的文件应该只包含接收 dag 对象并使用该 dag 对象创建任务的方法。
不过请注意,您应该在全局范围内只有一个 dag 对象。 Airflow 将全局范围内的所有 dag 对象作为单独的 dag。
使每个 dag 尽可能简洁通常被认为是一种很好的做法。但是,如果您需要设置此类依赖项,您可以考虑使用 subdags。更多信息请点击此处:https://airflow.incubator.apache.org/concepts.html?highlight=subdag#scope
您也可以使用 ExternalTaskSensor,但请注意,随着 dag 数量的增加,处理任务之间的外部依赖性可能会变得更加困难。我认为子标签可能是适合您的用例的方式。
似乎可以将您的 Python 模块放入 plugins/
子文件夹并从 DAG 文件导入它们:
https://airflow.apache.org/docs/apache-airflow/stable/plugins.html
随着 TaskGroups in Airflow 2.x, it's worth expanding on 的到来。任务组只是任务的 UI 分组,但它们也可以作为一组相关任务的方便的逻辑分组。 TaskGroup 中的任务可以被捆绑和抽象出来,以便更容易地从更大的部分构建 DAG。话虽如此,拥有一个充满相关任务的文件 而不 将它们捆绑到任务组中可能仍然有用。
分解 DAG 的诀窍是将 DAG 放在一个文件中,例如 my_dag.py
,并将任务或任务组的逻辑块放在单独的文件中,每个文件一个逻辑任务块或任务组。每个文件都包含函数(或方法,如果您想采用 OO 方法),每个函数 return 是一个运算符实例或一个 TaskGroup 实例。
为了说明,my_dag.py
(下图)从 foo_bar_tasks.py
导入 operator-returning 函数,并从 [=16] 导入 TaskGroup-returning 函数=].在 DAG 上下文中,调用这些函数并将它们的 return 值分配给任务或任务组变量,这些变量可以分配 up-/downstream 依赖项。
dags/my_dag.py
:
# std lib imports
from airflow import DAG
# other airflow imports
from includes.foo_bar_tasks import build_foo_task, build_bar_task
from includes.xyzzy_taskgroup import build_xyzzy_taskgroup
with DAG(dag_id="my_dag", ...) as dag:
# logical chunk of tasks
foo_task = build_foo_task(dag=dag, ...)
bar_task = build_bar_task(dag=dag, ...)
# taskgroup
xyzzy_taskgroup = build_xyzzy_taskgroup(dag=dag, ...)
foo_task >> bar_task >> xyzzy_taskgroup
plugins/includes/foo_bar_tasks.py
:
# std lib imports
from airflow import DAG
from airflow.operators.foo import FooOperator
from airflow.operators.bar import BarOperator
# other airflow imports
def build_foo_task(dag: DAG, ...) -> FooOperator:
# ... logic here ...
foo_task = FooOperator(..., dag=dag)
return foo_task
def build_bar_task(dag: DAG, ...) -> BarOperator:
# ... logic here ...
bar_task = BarOperator(..., dag=dag)
return bar_task
plugins/includes/xyzzy_taskgroup.py
:
# std lib imports
from airflow import DAG
from airflow.operators.baz import BazOperator
from airflow.operators.qux import QuxOperator
from airflow.utils import TaskGroup
# other airflow imports
def build_xyzzy_taskgroup(dag: DAG, ...) -> TaskGroup:
xyzzy_taskgroup = TaskGroup(group_id="xyzzy_taskgroup")
# ... logic here ...
baz_task = BazOperator(task_id="baz_task", task_group=xyzzy_taskgroup, ...)
# ... logic here ...
qux_task = QuxOperator(task_id="qux_task", task_group=xyzzy_taskgroup, ...)
baz_task >> qux_task
return xyzzy_taskgroup
刚刚开始使用 Airflow,想知道构建大型 DAG 的最佳实践是什么。对于我们的 ETL,我们有很多属于逻辑分组的任务,但这些组相互依赖。以下哪项被视为最佳做法?
- 一个包含所有任务的大型 DAG 文件
- 跨多个文件拆分 DAG 定义(如何操作?)
- 定义多个 DAG,每组任务一个,并使用 ExternalTaskSensor 设置它们之间的依赖关系
也接受其他建议。
DAG 只是 python 文件。因此,您可以将单个 dag 定义拆分为多个文件。不同的文件应该只包含接收 dag 对象并使用该 dag 对象创建任务的方法。
不过请注意,您应该在全局范围内只有一个 dag 对象。 Airflow 将全局范围内的所有 dag 对象作为单独的 dag。
使每个 dag 尽可能简洁通常被认为是一种很好的做法。但是,如果您需要设置此类依赖项,您可以考虑使用 subdags。更多信息请点击此处:https://airflow.incubator.apache.org/concepts.html?highlight=subdag#scope
您也可以使用 ExternalTaskSensor,但请注意,随着 dag 数量的增加,处理任务之间的外部依赖性可能会变得更加困难。我认为子标签可能是适合您的用例的方式。
似乎可以将您的 Python 模块放入 plugins/
子文件夹并从 DAG 文件导入它们:
https://airflow.apache.org/docs/apache-airflow/stable/plugins.html
随着 TaskGroups in Airflow 2.x, it's worth expanding on
分解 DAG 的诀窍是将 DAG 放在一个文件中,例如 my_dag.py
,并将任务或任务组的逻辑块放在单独的文件中,每个文件一个逻辑任务块或任务组。每个文件都包含函数(或方法,如果您想采用 OO 方法),每个函数 return 是一个运算符实例或一个 TaskGroup 实例。
为了说明,my_dag.py
(下图)从 foo_bar_tasks.py
导入 operator-returning 函数,并从 [=16] 导入 TaskGroup-returning 函数=].在 DAG 上下文中,调用这些函数并将它们的 return 值分配给任务或任务组变量,这些变量可以分配 up-/downstream 依赖项。
dags/my_dag.py
:
# std lib imports
from airflow import DAG
# other airflow imports
from includes.foo_bar_tasks import build_foo_task, build_bar_task
from includes.xyzzy_taskgroup import build_xyzzy_taskgroup
with DAG(dag_id="my_dag", ...) as dag:
# logical chunk of tasks
foo_task = build_foo_task(dag=dag, ...)
bar_task = build_bar_task(dag=dag, ...)
# taskgroup
xyzzy_taskgroup = build_xyzzy_taskgroup(dag=dag, ...)
foo_task >> bar_task >> xyzzy_taskgroup
plugins/includes/foo_bar_tasks.py
:
# std lib imports
from airflow import DAG
from airflow.operators.foo import FooOperator
from airflow.operators.bar import BarOperator
# other airflow imports
def build_foo_task(dag: DAG, ...) -> FooOperator:
# ... logic here ...
foo_task = FooOperator(..., dag=dag)
return foo_task
def build_bar_task(dag: DAG, ...) -> BarOperator:
# ... logic here ...
bar_task = BarOperator(..., dag=dag)
return bar_task
plugins/includes/xyzzy_taskgroup.py
:
# std lib imports
from airflow import DAG
from airflow.operators.baz import BazOperator
from airflow.operators.qux import QuxOperator
from airflow.utils import TaskGroup
# other airflow imports
def build_xyzzy_taskgroup(dag: DAG, ...) -> TaskGroup:
xyzzy_taskgroup = TaskGroup(group_id="xyzzy_taskgroup")
# ... logic here ...
baz_task = BazOperator(task_id="baz_task", task_group=xyzzy_taskgroup, ...)
# ... logic here ...
qux_task = QuxOperator(task_id="qux_task", task_group=xyzzy_taskgroup, ...)
baz_task >> qux_task
return xyzzy_taskgroup