如何使用 TaskFlow Api 在两个任务之间创建共享子任务?

How can I create a shared child between two tasks using TaskFlow Api?

我的代码如下所示:

def etl():
    for item in ['FIRST','SECCOND','THIRD']:
        if item == 'a':
            requests = ['Data1','Data3']
        elif item == 'b':
            requests = ['Data1']

        for data_name in requests:
            @task(task_id=f'{item}_{data_name}_task_a')
            def taska():
                a,b = some_func
                vars_dict = {'a': a,
                             'b': b}
                return vars_dict

            @task(task_id=f'{account}_{data_name}_get_liveops_data')
            def taskb(vars_dict):
                some_other_func
                return True

            if data_name=='Data1':
                @task(task_id='last_task')
                def last_task(success):
                    dim_experiments.main()
                    return

            vars_dict = taska()
            success = taskb(vars_dict)
            last_task(success)


myc_dag = etl()

狗看起来像这样:

什么时候应该是这样的:

目标是 last_task 依赖于 taskataskb 除了下载 Data3 请求的 taskataskb .我无法使用 TaskFlow API

实现它

并行依赖性正在发生,因为调用 last_task() TaskFlow 函数并设置任务依赖性(隐式地通过 TaskFlow API)是在调用其他任务的同一循环中完成的。每次调用 TaskFlow 函数都会创建一个新的任务节点。如果 last_task 被拉到循环外并且只在循环内设置必要的依赖项,您将获得所需的结构。

让我们以您的代码的简化版本为例。

from datetime import datetime
from airflow.decorators import dag, task


@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
    @task(task_id="last_task")
    def last_task(some_input=None):
        ...

    for item in ["a", "b"]:

        @task
        def taska():
            return {"a": "A", "b": "B"}

        @task
        def taskb(input):
            ...

        success = taskb(taska())
        last_task(success)


myc_dag = etl()

在上面的 DAG 中,taska()taskb()last_task() TaskFlow 函数都被调用,并在循环中设置它们的任务依赖关系。所以,我们看到 2 条平行路径:

要让 last_task() 成为两个路径的共享下游任务,我们需要将调用拉到 last_task() (意味着我们只创建一个任务节点一次)但保持任务之间的依赖关系taskb()last_task() 完好无损。这可以通过示例的小重构来完成:

@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
    @task(task_id="last_task")
    def last_task(some_input=None):
        ...

    last_task = last_task()

    for item in ["a", "b"]:
        @task
        def taska():
            return {"a": "A", "b": "B"}

        @task
        def taskb(input):
            ...

        success = taskb(taska())
        success >> last_task


myc_dag = etl()

请注意,last_task() TaskFlow 函数是在创建其他任务的循环外部 调用的。这确保 last_task() 任务只创建一次。另一个更改是将 last_task() 调用设置为一个变量,然后使用此变量将任务依赖项声明为 taskb() (类似于您在原始文件中对 success 变量所做的操作代码片段)。通过这些小的更改,我们得到了 2 条路径,它们具有共享的最终任务 last_task():