特定批次的气流任务
airflow tasks in specific batches
我想要运行一组这样的任务:
a >> [b,c,d] >> [e,f,g] >> [h,i,j,k,l,m]
首先运行任务a,完成后,并行运行b、c、d,然后当b、c、d的最后一个完成时。并行启动 运行ing e,f,g 等
但是我收到一个错误,操作数类型不受支持,适用于 >>: 'list' 和 'list'
我想做的事情的正确语法是什么?
您收到的错误与不支持使用按位运算符的列表之间的依赖关系有关,[task_a, task_b] >> [task_c, task_d]
将不起作用。
恕我直言,实现您正在寻找的东西(还有其他方法)的最简单和更简洁的方法是使用 TaskGroup
并设置它们之间的依赖关系,如下所示:
图表视图:
from time import sleep
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.task_group import TaskGroup
default_args = {
'start_date': days_ago(1)
}
def _execute_task(**kwargs):
print(f"Task_id: {kwargs['ti'].task_id}")
sleep(10)
def _create_python_task(name):
return PythonOperator(
task_id=f'task_{name}',
python_callable=_execute_task)
with DAG('parallel_tasks_example', schedule_interval='@once',
default_args=default_args, catchup=False) as dag:
task_a = DummyOperator(task_id='task_a')
with TaskGroup('first_group') as first_group:
for name in list('bcd'):
task = _create_python_task(name)
with TaskGroup('second_group') as second_group:
for name in list('efg'):
task = _create_python_task(name)
with TaskGroup('third_group') as third_group:
for name in list('hijk'):
task = _create_python_task(name)
task_a >> first_group >> second_group >> third_group
来自TaskGroup
class定义:
A collection of tasks. When set_downstream() or set_upstream() are called on
the TaskGroup, it is applied across all tasks within the group if necessary.
你可以找到一个关于here的官方例子。
我想要运行一组这样的任务:
a >> [b,c,d] >> [e,f,g] >> [h,i,j,k,l,m]
首先运行任务a,完成后,并行运行b、c、d,然后当b、c、d的最后一个完成时。并行启动 运行ing e,f,g 等
但是我收到一个错误,操作数类型不受支持,适用于 >>: 'list' 和 'list'
我想做的事情的正确语法是什么?
您收到的错误与不支持使用按位运算符的列表之间的依赖关系有关,[task_a, task_b] >> [task_c, task_d]
将不起作用。
恕我直言,实现您正在寻找的东西(还有其他方法)的最简单和更简洁的方法是使用 TaskGroup
并设置它们之间的依赖关系,如下所示:
图表视图:
from time import sleep
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.task_group import TaskGroup
default_args = {
'start_date': days_ago(1)
}
def _execute_task(**kwargs):
print(f"Task_id: {kwargs['ti'].task_id}")
sleep(10)
def _create_python_task(name):
return PythonOperator(
task_id=f'task_{name}',
python_callable=_execute_task)
with DAG('parallel_tasks_example', schedule_interval='@once',
default_args=default_args, catchup=False) as dag:
task_a = DummyOperator(task_id='task_a')
with TaskGroup('first_group') as first_group:
for name in list('bcd'):
task = _create_python_task(name)
with TaskGroup('second_group') as second_group:
for name in list('efg'):
task = _create_python_task(name)
with TaskGroup('third_group') as third_group:
for name in list('hijk'):
task = _create_python_task(name)
task_a >> first_group >> second_group >> third_group
来自TaskGroup
class定义:
A collection of tasks. When set_downstream() or set_upstream() are called on the TaskGroup, it is applied across all tasks within the group if necessary.
你可以找到一个关于here的官方例子。