Airflow:有没有办法在 dag 之外将操作员分组在一起?

Airflow: is there a way to group operators together outside of a dag?

有没有办法设计一个 python class 在 dag 之外实现特定的数据管道模式,以便将此 class 用于所有需要的数据管道这个模式?

示例:为了将数据从 Google Cloud Storage 加载到 Big Query,该过程可以是通过数据质量测试来验证摄取候选文件。然后尝试在 Big Query 中以原始 table 加载数据,然后根据加载结果将文件分派到存档或被拒绝的文件夹中。

做一次容易,做1000次怎么办?我正在想办法优化工程时间。

可以考虑使用 SubDag,但它在 performances and is going to be deprecated anyway.

方面显示出局限性

任务组需要成为要实施的 dag 的一部分 https://github.com/apache/airflow/blob/1be3ef635fab635f741b775c52e0da7fe0871567/airflow/utils/task_group.py#L35

实现预期行为的一种方法可能是从利用动态 DAGing

的单个 python 文件生成 dag、任务组和任务

然而,不能在代码库的某处重复使用此特定文件中使用的代码。它反对 DRYness,尽管 DRYness 与可理解性始终是一种权衡。

您应该只创建自己的 Operator,然后在 DAG 中使用它。 扩展 BaseOperator 并使用挂钩到 BigQuery 或您需要的任何东西。

我也对这个问题感兴趣。 Airflow 2.0 发布了 Dynamic DAG 的新特性。虽然我不确定它是否会完全符合您的设计。它可以解决单一代码库的问题。在我的例子中,我有一个函数来创建一个带有必要参数的任务组。然后我迭代创建每个 DAG,该 DAG 具有创建具有不同参数的任务组的功能。这是我的伪代码的概述:

def create_task_group(group_id, a, b, c):
    with TaskGroup(group_id=group_id) as my_task_group:
        # add some tasks
        pass

for x in LIST_OF_THINGS:
    dag_id = f"{x}_workflow"
    schedule_interval = SCHEDULE_INTERVAL[x]

    with DAG(
        dag_id,
        start_date=START_DATE,
        schedule_interval=schedule_interval,
    ) as globals()[dag_id]:
        task_group = create_task_group(x, ..., ..., ...)

这里的LIST_OF_THINGS代表不同配置的列表。每个DAG可以有不同的dag_idschedule_intervalstart_date等。您可以在一些配置文件中定义您的任务配置,例如 JSON 或 YAML,并将其解析为字典。

我没试过,但从技术上讲,您也许可以将 create_task_group() 移动到某些 class 中,如果您需要重用相同的功能,也可以导入它。任务组还有一个好处就是可以给其他任务或任务组添加任务依赖,非常方便。

我看到了使用额外包为 Airflow DAG 配置 YAML 的概念,但我不确定它是否成熟。

在此处查看有关动态 DAG 的更多信息:https://www.astronomer.io/guides/dynamically-generating-dags

基于此article

解决这个问题的方法如下:

你可以在airflow./plugins中定义一个插件 让我们在 ./plugins/test_taskgroup.py

中创建一个示例任务组
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

def hello_world_py():
        print('Hello World')

def build_taskgroup(dag: DAG) -> TaskGroup:
    
    with TaskGroup(group_id="taskgroup") as taskgroup:
        dummy_task = DummyOperator(
            task_id="dummy_task",
            dag=dag
        )
        python_task = PythonOperator(
            task_id="python_task",
            python_callable=hello_world_py,
            dag=dag
        )

    dummy_task >> python_task
    return taskgroup

您可以像这样在简单的 python DAG 中调用它:

from airflow.utils import task_group
from test_plugin import build_taskgroup
from airflow import DAG


with DAG(
    dag_id="modularized_dag",
    schedule_interval="@once",
    start_date=datetime(2021, 1, 1),
) as dag:

    task_group = build_taskgroup(dag)

这是结果