Airflow:有没有办法在 dag 之外将操作员分组在一起?
Airflow: is there a way to group operators together outside of a dag?
有没有办法设计一个 python class 在 dag 之外实现特定的数据管道模式,以便将此 class 用于所有需要的数据管道这个模式?
示例:为了将数据从 Google Cloud Storage 加载到 Big Query,该过程可以是通过数据质量测试来验证摄取候选文件。然后尝试在 Big Query 中以原始 table 加载数据,然后根据加载结果将文件分派到存档或被拒绝的文件夹中。
做一次容易,做1000次怎么办?我正在想办法优化工程时间。
可以考虑使用 SubDag,但它在 performances and is going to be deprecated anyway.
方面显示出局限性
任务组需要成为要实施的 dag 的一部分 https://github.com/apache/airflow/blob/1be3ef635fab635f741b775c52e0da7fe0871567/airflow/utils/task_group.py#L35。
实现预期行为的一种方法可能是从利用动态 DAGing
的单个 python 文件生成 dag、任务组和任务
然而,不能在代码库的某处重复使用此特定文件中使用的代码。它反对 DRYness,尽管 DRYness 与可理解性始终是一种权衡。
您应该只创建自己的 Operator,然后在 DAG 中使用它。
扩展 BaseOperator 并使用挂钩到 BigQuery 或您需要的任何东西。
我也对这个问题感兴趣。 Airflow 2.0 发布了 Dynamic DAG 的新特性。虽然我不确定它是否会完全符合您的设计。它可以解决单一代码库的问题。在我的例子中,我有一个函数来创建一个带有必要参数的任务组。然后我迭代创建每个 DAG,该 DAG 具有创建具有不同参数的任务组的功能。这是我的伪代码的概述:
def create_task_group(group_id, a, b, c):
with TaskGroup(group_id=group_id) as my_task_group:
# add some tasks
pass
for x in LIST_OF_THINGS:
dag_id = f"{x}_workflow"
schedule_interval = SCHEDULE_INTERVAL[x]
with DAG(
dag_id,
start_date=START_DATE,
schedule_interval=schedule_interval,
) as globals()[dag_id]:
task_group = create_task_group(x, ..., ..., ...)
这里的LIST_OF_THINGS
代表不同配置的列表。每个DAG可以有不同的dag_id
、schedule_interval
、start_date
等。您可以在一些配置文件中定义您的任务配置,例如 JSON 或 YAML,并将其解析为字典。
我没试过,但从技术上讲,您也许可以将 create_task_group()
移动到某些 class 中,如果您需要重用相同的功能,也可以导入它。任务组还有一个好处就是可以给其他任务或任务组添加任务依赖,非常方便。
我看到了使用额外包为 Airflow DAG 配置 YAML 的概念,但我不确定它是否成熟。
在此处查看有关动态 DAG 的更多信息:https://www.astronomer.io/guides/dynamically-generating-dags
基于此article
解决这个问题的方法如下:
你可以在airflow./plugins中定义一个插件
让我们在 ./plugins/test_taskgroup.py
中创建一个示例任务组
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
def hello_world_py():
print('Hello World')
def build_taskgroup(dag: DAG) -> TaskGroup:
with TaskGroup(group_id="taskgroup") as taskgroup:
dummy_task = DummyOperator(
task_id="dummy_task",
dag=dag
)
python_task = PythonOperator(
task_id="python_task",
python_callable=hello_world_py,
dag=dag
)
dummy_task >> python_task
return taskgroup
您可以像这样在简单的 python DAG 中调用它:
from airflow.utils import task_group
from test_plugin import build_taskgroup
from airflow import DAG
with DAG(
dag_id="modularized_dag",
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
) as dag:
task_group = build_taskgroup(dag)
这是结果
有没有办法设计一个 python class 在 dag 之外实现特定的数据管道模式,以便将此 class 用于所有需要的数据管道这个模式?
示例:为了将数据从 Google Cloud Storage 加载到 Big Query,该过程可以是通过数据质量测试来验证摄取候选文件。然后尝试在 Big Query 中以原始 table 加载数据,然后根据加载结果将文件分派到存档或被拒绝的文件夹中。
做一次容易,做1000次怎么办?我正在想办法优化工程时间。
可以考虑使用 SubDag,但它在 performances and is going to be deprecated anyway.
方面显示出局限性任务组需要成为要实施的 dag 的一部分 https://github.com/apache/airflow/blob/1be3ef635fab635f741b775c52e0da7fe0871567/airflow/utils/task_group.py#L35。
实现预期行为的一种方法可能是从利用动态 DAGing
的单个 python 文件生成 dag、任务组和任务然而,不能在代码库的某处重复使用此特定文件中使用的代码。它反对 DRYness,尽管 DRYness 与可理解性始终是一种权衡。
您应该只创建自己的 Operator,然后在 DAG 中使用它。 扩展 BaseOperator 并使用挂钩到 BigQuery 或您需要的任何东西。
我也对这个问题感兴趣。 Airflow 2.0 发布了 Dynamic DAG 的新特性。虽然我不确定它是否会完全符合您的设计。它可以解决单一代码库的问题。在我的例子中,我有一个函数来创建一个带有必要参数的任务组。然后我迭代创建每个 DAG,该 DAG 具有创建具有不同参数的任务组的功能。这是我的伪代码的概述:
def create_task_group(group_id, a, b, c):
with TaskGroup(group_id=group_id) as my_task_group:
# add some tasks
pass
for x in LIST_OF_THINGS:
dag_id = f"{x}_workflow"
schedule_interval = SCHEDULE_INTERVAL[x]
with DAG(
dag_id,
start_date=START_DATE,
schedule_interval=schedule_interval,
) as globals()[dag_id]:
task_group = create_task_group(x, ..., ..., ...)
这里的LIST_OF_THINGS
代表不同配置的列表。每个DAG可以有不同的dag_id
、schedule_interval
、start_date
等。您可以在一些配置文件中定义您的任务配置,例如 JSON 或 YAML,并将其解析为字典。
我没试过,但从技术上讲,您也许可以将 create_task_group()
移动到某些 class 中,如果您需要重用相同的功能,也可以导入它。任务组还有一个好处就是可以给其他任务或任务组添加任务依赖,非常方便。
我看到了使用额外包为 Airflow DAG 配置 YAML 的概念,但我不确定它是否成熟。
在此处查看有关动态 DAG 的更多信息:https://www.astronomer.io/guides/dynamically-generating-dags
基于此article
解决这个问题的方法如下:
你可以在airflow./plugins中定义一个插件 让我们在 ./plugins/test_taskgroup.py
中创建一个示例任务组from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
def hello_world_py():
print('Hello World')
def build_taskgroup(dag: DAG) -> TaskGroup:
with TaskGroup(group_id="taskgroup") as taskgroup:
dummy_task = DummyOperator(
task_id="dummy_task",
dag=dag
)
python_task = PythonOperator(
task_id="python_task",
python_callable=hello_world_py,
dag=dag
)
dummy_task >> python_task
return taskgroup
您可以像这样在简单的 python DAG 中调用它:
from airflow.utils import task_group
from test_plugin import build_taskgroup
from airflow import DAG
with DAG(
dag_id="modularized_dag",
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
) as dag:
task_group = build_taskgroup(dag)
这是结果