Airflow:在单个 DAG 文件中导入装饰任务与所有任务?
Airflow: Importing decorated Task vs all tasks in a single DAG file?
我最近开始使用 Apache Airflow 及其新概念之一 Taskflow API。我有一个包含多个装饰任务的 DAG,其中每个任务都有 50 多行代码。所以我决定将每个任务移动到一个单独的文件中。
在引用 Whosebug 之后,我可以以某种方式将 DAG 中的任务移动到每个任务的单独文件中。现在,我的问题是:
- 下面显示的两个代码示例是否工作相同? (我担心任务的范围)。
- 他们将如何共享数据 b/w 他们?
- 性能有什么不同吗? (我读到 Subdags are discouraged due to performance issues,尽管这不是 Subdags 所关心的)。
我在网络(和官方文档)中看到的所有代码示例都将所有任务放在一个文件中。
示例 1
import logging
from airflow.decorators import dag, task
from datetime import datetime
default_args = {"owner": "airflow", "start_date": datetime(2021, 1, 1)}
@dag(default_args=default_args, schedule_interval=None)
def No_Import_Tasks():
# Task 1
@task()
def Task_A():
logging.info(f"Task A: Received param None")
# Some 100 lines of code
return "A"
# Task 2
@task()
def Task_B(a):
logging.info(f"Task B: Received param {a}")
# Some 100 lines of code
return str(a + "B")
a = Task_A()
ab = Task_B(a)
No_Import_Tasks = No_Import_Tasks()
示例 2 文件夹结构:
- dags
- tasks
- Task_A.py
- Task_B.py
- Main_DAG.py
文件Task_A.py
import logging
from airflow.decorators import task
@task()
def Task_A():
logging.info(f"Task A: Received param None")
# Some 100 lines of code
return "A"
文件Task_B.py
import logging
from airflow.decorators import task
@task()
def Task_B(a):
logging.info(f"Task B: Received param {a}")
# Some 100 lines of code
return str(a + "B")
文件Main_Dag.py
from airflow.decorators import dag
from datetime import datetime
from tasks.Task_A import Task_A
from tasks.Task_B import Task_B
default_args = {"owner": "airflow", "start_date": datetime(2021, 1, 1)}
@dag(default_args=default_args, schedule_interval=None)
def Import_Tasks():
a = Task_A()
ab = Task_B(a)
Import_Tasks_dag = Import_Tasks()
提前致谢!
这两种方法实际上没有区别——无论是从逻辑还是性能的角度来看。
Airflow 中的任务使用 XCom (https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html) 在它们之间共享数据,通过数据库(或其他外部存储)有效地交换数据。 Airflow 中的两个任务——不管它们是在一个文件还是多个文件中定义的——无论如何都可以在完全不同的机器上执行(气流中没有任务亲和力——每个任务执行都与其他任务完全分开。所以它不会重要 - 再次 - 如果它们在一个或多个 Python 文件中。
性能应该差不多。也许拆分成几个文件的速度非常非常慢,但它应该完全可以忽略不计,甚至可能根本不存在 - 取决于你分发文件等方式的部署,但我无法想象这会产生任何可观察到的影响.
我最近开始使用 Apache Airflow 及其新概念之一 Taskflow API。我有一个包含多个装饰任务的 DAG,其中每个任务都有 50 多行代码。所以我决定将每个任务移动到一个单独的文件中。
在引用 Whosebug 之后,我可以以某种方式将 DAG 中的任务移动到每个任务的单独文件中。现在,我的问题是:
- 下面显示的两个代码示例是否工作相同? (我担心任务的范围)。
- 他们将如何共享数据 b/w 他们?
- 性能有什么不同吗? (我读到 Subdags are discouraged due to performance issues,尽管这不是 Subdags 所关心的)。
我在网络(和官方文档)中看到的所有代码示例都将所有任务放在一个文件中。
示例 1
import logging
from airflow.decorators import dag, task
from datetime import datetime
default_args = {"owner": "airflow", "start_date": datetime(2021, 1, 1)}
@dag(default_args=default_args, schedule_interval=None)
def No_Import_Tasks():
# Task 1
@task()
def Task_A():
logging.info(f"Task A: Received param None")
# Some 100 lines of code
return "A"
# Task 2
@task()
def Task_B(a):
logging.info(f"Task B: Received param {a}")
# Some 100 lines of code
return str(a + "B")
a = Task_A()
ab = Task_B(a)
No_Import_Tasks = No_Import_Tasks()
示例 2 文件夹结构:
- dags
- tasks
- Task_A.py
- Task_B.py
- Main_DAG.py
文件Task_A.py
import logging
from airflow.decorators import task
@task()
def Task_A():
logging.info(f"Task A: Received param None")
# Some 100 lines of code
return "A"
文件Task_B.py
import logging
from airflow.decorators import task
@task()
def Task_B(a):
logging.info(f"Task B: Received param {a}")
# Some 100 lines of code
return str(a + "B")
文件Main_Dag.py
from airflow.decorators import dag
from datetime import datetime
from tasks.Task_A import Task_A
from tasks.Task_B import Task_B
default_args = {"owner": "airflow", "start_date": datetime(2021, 1, 1)}
@dag(default_args=default_args, schedule_interval=None)
def Import_Tasks():
a = Task_A()
ab = Task_B(a)
Import_Tasks_dag = Import_Tasks()
提前致谢!
这两种方法实际上没有区别——无论是从逻辑还是性能的角度来看。
Airflow 中的任务使用 XCom (https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html) 在它们之间共享数据,通过数据库(或其他外部存储)有效地交换数据。 Airflow 中的两个任务——不管它们是在一个文件还是多个文件中定义的——无论如何都可以在完全不同的机器上执行(气流中没有任务亲和力——每个任务执行都与其他任务完全分开。所以它不会重要 - 再次 - 如果它们在一个或多个 Python 文件中。
性能应该差不多。也许拆分成几个文件的速度非常非常慢,但它应该完全可以忽略不计,甚至可能根本不存在 - 取决于你分发文件等方式的部署,但我无法想象这会产生任何可观察到的影响.