如何使用 TaskFlow Api 在两个任务之间创建共享子任务?
How can I create a shared child between two tasks using TaskFlow Api?
我的代码如下所示:
def etl():
for item in ['FIRST','SECCOND','THIRD']:
if item == 'a':
requests = ['Data1','Data3']
elif item == 'b':
requests = ['Data1']
for data_name in requests:
@task(task_id=f'{item}_{data_name}_task_a')
def taska():
a,b = some_func
vars_dict = {'a': a,
'b': b}
return vars_dict
@task(task_id=f'{account}_{data_name}_get_liveops_data')
def taskb(vars_dict):
some_other_func
return True
if data_name=='Data1':
@task(task_id='last_task')
def last_task(success):
dim_experiments.main()
return
vars_dict = taska()
success = taskb(vars_dict)
last_task(success)
myc_dag = etl()
狗看起来像这样:
什么时候应该是这样的:
目标是 last_task
依赖于 taska
和 taskb
除了下载 Data3
请求的 taska
和 taskb
.我无法使用 TaskFlow API
实现它
并行依赖性正在发生,因为调用 last_task()
TaskFlow 函数并设置任务依赖性(隐式地通过 TaskFlow API)是在调用其他任务的同一循环中完成的。每次调用 TaskFlow 函数都会创建一个新的任务节点。如果 last_task
被拉到循环外并且只在循环内设置必要的依赖项,您将获得所需的结构。
让我们以您的代码的简化版本为例。
from datetime import datetime
from airflow.decorators import dag, task
@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
@task(task_id="last_task")
def last_task(some_input=None):
...
for item in ["a", "b"]:
@task
def taska():
return {"a": "A", "b": "B"}
@task
def taskb(input):
...
success = taskb(taska())
last_task(success)
myc_dag = etl()
在上面的 DAG 中,taska()
、taskb()
和 last_task()
TaskFlow 函数都被调用,并在循环中设置它们的任务依赖关系。所以,我们看到 2 条平行路径:
要让 last_task()
成为两个路径的共享下游任务,我们需要将调用拉到 last_task()
(意味着我们只创建一个任务节点一次)但保持任务之间的依赖关系taskb()
和 last_task()
完好无损。这可以通过示例的小重构来完成:
@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
@task(task_id="last_task")
def last_task(some_input=None):
...
last_task = last_task()
for item in ["a", "b"]:
@task
def taska():
return {"a": "A", "b": "B"}
@task
def taskb(input):
...
success = taskb(taska())
success >> last_task
myc_dag = etl()
请注意,last_task()
TaskFlow 函数是在创建其他任务的循环外部 调用的。这确保 last_task()
任务只创建一次。另一个更改是将 last_task()
调用设置为一个变量,然后使用此变量将任务依赖项声明为 taskb()
(类似于您在原始文件中对 success
变量所做的操作代码片段)。通过这些小的更改,我们得到了 2 条路径,它们具有共享的最终任务 last_task()
:
我的代码如下所示:
def etl():
for item in ['FIRST','SECCOND','THIRD']:
if item == 'a':
requests = ['Data1','Data3']
elif item == 'b':
requests = ['Data1']
for data_name in requests:
@task(task_id=f'{item}_{data_name}_task_a')
def taska():
a,b = some_func
vars_dict = {'a': a,
'b': b}
return vars_dict
@task(task_id=f'{account}_{data_name}_get_liveops_data')
def taskb(vars_dict):
some_other_func
return True
if data_name=='Data1':
@task(task_id='last_task')
def last_task(success):
dim_experiments.main()
return
vars_dict = taska()
success = taskb(vars_dict)
last_task(success)
myc_dag = etl()
狗看起来像这样:
什么时候应该是这样的:
目标是 last_task
依赖于 taska
和 taskb
除了下载 Data3
请求的 taska
和 taskb
.我无法使用 TaskFlow API
并行依赖性正在发生,因为调用 last_task()
TaskFlow 函数并设置任务依赖性(隐式地通过 TaskFlow API)是在调用其他任务的同一循环中完成的。每次调用 TaskFlow 函数都会创建一个新的任务节点。如果 last_task
被拉到循环外并且只在循环内设置必要的依赖项,您将获得所需的结构。
让我们以您的代码的简化版本为例。
from datetime import datetime
from airflow.decorators import dag, task
@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
@task(task_id="last_task")
def last_task(some_input=None):
...
for item in ["a", "b"]:
@task
def taska():
return {"a": "A", "b": "B"}
@task
def taskb(input):
...
success = taskb(taska())
last_task(success)
myc_dag = etl()
在上面的 DAG 中,taska()
、taskb()
和 last_task()
TaskFlow 函数都被调用,并在循环中设置它们的任务依赖关系。所以,我们看到 2 条平行路径:
要让 last_task()
成为两个路径的共享下游任务,我们需要将调用拉到 last_task()
(意味着我们只创建一个任务节点一次)但保持任务之间的依赖关系taskb()
和 last_task()
完好无损。这可以通过示例的小重构来完成:
@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
@task(task_id="last_task")
def last_task(some_input=None):
...
last_task = last_task()
for item in ["a", "b"]:
@task
def taska():
return {"a": "A", "b": "B"}
@task
def taskb(input):
...
success = taskb(taska())
success >> last_task
myc_dag = etl()
请注意,last_task()
TaskFlow 函数是在创建其他任务的循环外部 调用的。这确保 last_task()
任务只创建一次。另一个更改是将 last_task()
调用设置为一个变量,然后使用此变量将任务依赖项声明为 taskb()
(类似于您在原始文件中对 success
变量所做的操作代码片段)。通过这些小的更改,我们得到了 2 条路径,它们具有共享的最终任务 last_task()
: