Airflow 2.0:使用 Taskflow API 在 class 中封装 DAG

Airflow 2.0: Encapsulating DAG in class using Taskflow API

我有管道,其中机制总是相同的,两个任务的序列。 所以我尝试通过父抽象class(使用TaskFlow API)来抽象它的构造:

from abc import ABC, abstractmethod
from airflow.decorators import dag, task
from datetime import datetime

def AbstractDag(ABC):
    @abstractmethod
    def task_1(self):
        """task 1"""

    @abstractmethod
    def task_2(self, data):
        """task 2"""

    def dag_wrapper(self):
        @dag(schedule_interval=None, start_date=datetime(2022, 1, 1))
        def dag():
            @task(task_id='task_1')
            def task_1():
                return self.task_1()

            @task(task_id='task_2')
            def task_2(data):
                return self.task_2(data)

            task_2(task_1())

        return dag

但是当我尝试继承这个class时,我在界面中看不到我的dag:

class MyCustomDag(AbstractDag):
    def task_1(self):
        return 2

    @abstractmethod
    def task_2(self, data):
        print(data)


custom_dag = MyCustomDag()
dag_object = custom_dag.dag_wrapper()

你知道怎么做吗?或者更好的想法来抽象这个?

非常感谢! 尼古拉斯

我能够让您的示例 DAG 在 UI 中呈现,只需进行一些小的调整:

  • MyCustomDag.task_2 方法不需要修饰为 abstractmethod
  • 使用 dag() 作为包装的 DAG 对象函数名称有其问题,因为它也是装饰器名称。
  • AbstractDag.dag_wrapper 方法中,您确实需要调用 @dag 装饰函数。

这是我使用的代码:

from abc import ABC, abstractmethod
from airflow.decorators import dag, task
from datetime import datetime

class AbstractDag(ABC):
    @abstractmethod
    def task_1(self):
        """task 1"""

    @abstractmethod
    def task_2(self, data):
        """task 2"""

    def dag_wrapper(self):
        @dag(schedule_interval=None, start_date=datetime(2022, 1, 1))
        def _dag():
            @task(task_id='task_1')
            def task_1():
                return self.task_1()

            @task(task_id='task_2')
            def task_2(data):
                return self.task_2(data)

            task_2(task_1())

        return _dag()


class MyCustomDag(AbstractDag):
    def task_1(self):
        return 2

    def task_2(self, data):
        print(data)


custom_dag = MyCustomDag()
dag_object = custom_dag.dag_wrapper()