特定批次的气流任务

airflow tasks in specific batches

我想要运行一组这样的任务:

a >> [b,c,d] >> [e,f,g] >> [h,i,j,k,l,m]

首先运行任务a,完成后,并行运行b、c、d,然后当b、c、d的最后一个完成时。并行启动 运行ing e,f,g 等

但是我收到一个错误,操作数类型不受支持,适用于 >>: 'list' 和 'list'

我想做的事情的正确语法是什么?

您收到的错误与不支持使用按位运算符的列表之间的依赖关系有关,[task_a, task_b] >> [task_c, task_d] 将不起作用。

恕我直言,实现您正在寻找的东西(还有其他方法)的最简单和更简洁的方法是使用 TaskGroup 并设置它们之间的依赖关系,如下所示:

图表视图:

from time import sleep
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.task_group import TaskGroup

default_args = {
    'start_date':  days_ago(1)
}


def _execute_task(**kwargs):
    print(f"Task_id: {kwargs['ti'].task_id}")
    sleep(10)


def _create_python_task(name):
    return PythonOperator(
        task_id=f'task_{name}',
        python_callable=_execute_task)


with DAG('parallel_tasks_example', schedule_interval='@once',
         default_args=default_args, catchup=False) as dag:

    task_a = DummyOperator(task_id='task_a')

    with TaskGroup('first_group') as first_group:

        for name in list('bcd'):
            task = _create_python_task(name)

    with TaskGroup('second_group') as second_group:

        for name in list('efg'):
            task = _create_python_task(name)

    with TaskGroup('third_group') as third_group:

        for name in list('hijk'):
            task = _create_python_task(name)

task_a >> first_group >> second_group >> third_group

来自TaskGroupclass定义:

A collection of tasks. When set_downstream() or set_upstream() are called on the TaskGroup, it is applied across all tasks within the group if necessary.

你可以找到一个关于here的官方例子。