Airflow:在单个 DAG 文件中导入装饰任务与所有任务?

Airflow: Importing decorated Task vs all tasks in a single DAG file?

我最近开始使用 Apache Airflow 及其新概念之一 Taskflow API。我有一个包含多个装饰任务的 DAG,其中每个任务都有 50 多行代码。所以我决定将每个任务移动到一个单独的文件中。

在引用 Whosebug 之后,我可以以某种方式将 DAG 中的任务移动到每个任务的单独文件中。现在,我的问题是:

  1. 下面显示的两个代码示例是否工作相同? (我担心任务的范围)。
  2. 他们将如何共享数据 b/w 他们?
  3. 性能有什么不同吗? (我读到 Subdags are discouraged due to performance issues,尽管这不是 Subdags 所关心的)。

我在网络(和官方文档)中看到的所有代码示例都将所有任务放在一个文件中。

示例 1

import logging
from airflow.decorators import dag, task
from datetime import datetime

default_args = {"owner": "airflow", "start_date": datetime(2021, 1, 1)}

@dag(default_args=default_args, schedule_interval=None)
def No_Import_Tasks():
    # Task 1
    @task()
    def Task_A():
        logging.info(f"Task A: Received param None")
        # Some 100 lines of code
        return "A"

    # Task 2
    @task()
    def Task_B(a):
        logging.info(f"Task B: Received param {a}")
        # Some 100 lines of code
        return str(a + "B")

    a = Task_A()
    ab = Task_B(a)

No_Import_Tasks = No_Import_Tasks()

示例 2 文件夹结构:

- dags
    - tasks
        - Task_A.py
        - Task_B.py
    - Main_DAG.py

文件Task_A.py

import logging
from airflow.decorators import task

@task()
def Task_A():
    logging.info(f"Task A: Received param None")
    # Some 100 lines of code
    return "A"

文件Task_B.py

import logging
from airflow.decorators import task

@task()
def Task_B(a):
    logging.info(f"Task B: Received param {a}")
    # Some 100 lines of code
    return str(a + "B")

文件Main_Dag.py

from airflow.decorators import dag
from datetime import datetime
from tasks.Task_A import Task_A
from tasks.Task_B import Task_B

default_args = {"owner": "airflow", "start_date": datetime(2021, 1, 1)}

@dag(default_args=default_args, schedule_interval=None)
def Import_Tasks():
    a = Task_A()
    ab = Task_B(a)

Import_Tasks_dag = Import_Tasks()

提前致谢!

  1. 这两种方法实际上没有区别——无论是从逻辑还是性能的角度来看。

  2. Airflow 中的任务使用 XCom (https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html) 在它们之间共享数据,通过数据库(或其他外部存储)有效地交换数据。 Airflow 中的两个任务——不管它们是在一个文件还是多个文件中定义的——无论如何都可以在完全不同的机器上执行(气流中没有任务亲和力——每个任务执行都与其他任务完全分开。所以它不会重要 - 再次 - 如果它们在一个或多个 Python 文件中。

  3. 性能应该差不多。也许拆分成几个文件的速度非常非常慢,但它应该完全可以忽略不计,甚至可能根本不存在 - 取决于你分发文件等方式的部署,但我无法想象这会产生任何可观察到的影响.