Airflow 2.0:使用 Taskflow API 在 class 中封装 DAG
Airflow 2.0: Encapsulating DAG in class using Taskflow API
我有管道,其中机制总是相同的,两个任务的序列。
所以我尝试通过父抽象class(使用TaskFlow API)来抽象它的构造:
from abc import ABC, abstractmethod
from airflow.decorators import dag, task
from datetime import datetime
def AbstractDag(ABC):
@abstractmethod
def task_1(self):
"""task 1"""
@abstractmethod
def task_2(self, data):
"""task 2"""
def dag_wrapper(self):
@dag(schedule_interval=None, start_date=datetime(2022, 1, 1))
def dag():
@task(task_id='task_1')
def task_1():
return self.task_1()
@task(task_id='task_2')
def task_2(data):
return self.task_2(data)
task_2(task_1())
return dag
但是当我尝试继承这个class时,我在界面中看不到我的dag:
class MyCustomDag(AbstractDag):
def task_1(self):
return 2
@abstractmethod
def task_2(self, data):
print(data)
custom_dag = MyCustomDag()
dag_object = custom_dag.dag_wrapper()
你知道怎么做吗?或者更好的想法来抽象这个?
非常感谢!
尼古拉斯
我能够让您的示例 DAG 在 UI 中呈现,只需进行一些小的调整:
MyCustomDag.task_2
方法不需要修饰为 abstractmethod
。
- 使用
dag()
作为包装的 DAG 对象函数名称有其问题,因为它也是装饰器名称。
- 在
AbstractDag.dag_wrapper
方法中,您确实需要调用 @dag
装饰函数。
这是我使用的代码:
from abc import ABC, abstractmethod
from airflow.decorators import dag, task
from datetime import datetime
class AbstractDag(ABC):
@abstractmethod
def task_1(self):
"""task 1"""
@abstractmethod
def task_2(self, data):
"""task 2"""
def dag_wrapper(self):
@dag(schedule_interval=None, start_date=datetime(2022, 1, 1))
def _dag():
@task(task_id='task_1')
def task_1():
return self.task_1()
@task(task_id='task_2')
def task_2(data):
return self.task_2(data)
task_2(task_1())
return _dag()
class MyCustomDag(AbstractDag):
def task_1(self):
return 2
def task_2(self, data):
print(data)
custom_dag = MyCustomDag()
dag_object = custom_dag.dag_wrapper()
我有管道,其中机制总是相同的,两个任务的序列。 所以我尝试通过父抽象class(使用TaskFlow API)来抽象它的构造:
from abc import ABC, abstractmethod
from airflow.decorators import dag, task
from datetime import datetime
def AbstractDag(ABC):
@abstractmethod
def task_1(self):
"""task 1"""
@abstractmethod
def task_2(self, data):
"""task 2"""
def dag_wrapper(self):
@dag(schedule_interval=None, start_date=datetime(2022, 1, 1))
def dag():
@task(task_id='task_1')
def task_1():
return self.task_1()
@task(task_id='task_2')
def task_2(data):
return self.task_2(data)
task_2(task_1())
return dag
但是当我尝试继承这个class时,我在界面中看不到我的dag:
class MyCustomDag(AbstractDag):
def task_1(self):
return 2
@abstractmethod
def task_2(self, data):
print(data)
custom_dag = MyCustomDag()
dag_object = custom_dag.dag_wrapper()
你知道怎么做吗?或者更好的想法来抽象这个?
非常感谢! 尼古拉斯
我能够让您的示例 DAG 在 UI 中呈现,只需进行一些小的调整:
MyCustomDag.task_2
方法不需要修饰为abstractmethod
。- 使用
dag()
作为包装的 DAG 对象函数名称有其问题,因为它也是装饰器名称。 - 在
AbstractDag.dag_wrapper
方法中,您确实需要调用@dag
装饰函数。
这是我使用的代码:
from abc import ABC, abstractmethod
from airflow.decorators import dag, task
from datetime import datetime
class AbstractDag(ABC):
@abstractmethod
def task_1(self):
"""task 1"""
@abstractmethod
def task_2(self, data):
"""task 2"""
def dag_wrapper(self):
@dag(schedule_interval=None, start_date=datetime(2022, 1, 1))
def _dag():
@task(task_id='task_1')
def task_1():
return self.task_1()
@task(task_id='task_2')
def task_2(data):
return self.task_2(data)
task_2(task_1())
return _dag()
class MyCustomDag(AbstractDag):
def task_1(self):
return 2
def task_2(self, data):
print(data)
custom_dag = MyCustomDag()
dag_object = custom_dag.dag_wrapper()