Airflow taskflow - 运行 并行任务
Airflow taskflow - run task in parallele
想尝试新的任务流程API我到了需要 2 个并行任务的地步。
使用 Airflow v1,我习惯于做类似的事情
task_1 >> [task_2, task_3]
[task_2, task_3] >> task_4
PythonOperator
我们现在调用任务的方式不同了
我怎样才能用 TaskFlow 做列表?
谢谢
如果每个任务都依赖于上一个任务的值,您可以通过以下方式实现它:
from airflow.utils.dates import days_ago
from airflow.decorators import task, dag
@task
def task_1():
return 'first task'
@task
def task_2(value):
return 'second task'
@task
def task_3(value):
return 'third task'
@task
def task_4(value1, value2):
return 'forth task'
default_args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
@dag(dag_id='taskflow_Whosebug', schedule_interval='@once', default_args=default_args, catchup=False)
def my_dag():
op_1 = task_1()
op_2 = task_2(op_1)
op_3 = task_3(op_1)
op_4 = task_4(op_2, op_3)
dag = my_dag()
您提到的语法也受支持,但您无法直接访问之前任务中的 xcom 值:
@task
def task_1():
return 'first task'
@task
def task_2():
return 'second task'
@task
def task_3():
return 'third task'
@task
def task_4():
return 'forth task'
default_args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
@dag(dag_id='taskflow_Whosebug', schedule_interval='@once', default_args=default_args, catchup=False)
def my_dag():
op_1 = task_1()
op_2 = task_2()
op_3 = task_3()
op_4 = task_4()
op_1 >> [op_2, op_3]
[op_2, op_3] >> op_4
dag = my_dag()
可能您需要混合使用两种语法选项,具体取决于您想要实现的目标。
想尝试新的任务流程API我到了需要 2 个并行任务的地步。
使用 Airflow v1,我习惯于做类似的事情
task_1 >> [task_2, task_3]
[task_2, task_3] >> task_4
PythonOperator
我怎样才能用 TaskFlow 做列表?
谢谢
如果每个任务都依赖于上一个任务的值,您可以通过以下方式实现它:
from airflow.utils.dates import days_ago
from airflow.decorators import task, dag
@task
def task_1():
return 'first task'
@task
def task_2(value):
return 'second task'
@task
def task_3(value):
return 'third task'
@task
def task_4(value1, value2):
return 'forth task'
default_args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
@dag(dag_id='taskflow_Whosebug', schedule_interval='@once', default_args=default_args, catchup=False)
def my_dag():
op_1 = task_1()
op_2 = task_2(op_1)
op_3 = task_3(op_1)
op_4 = task_4(op_2, op_3)
dag = my_dag()
您提到的语法也受支持,但您无法直接访问之前任务中的 xcom 值:
@task
def task_1():
return 'first task'
@task
def task_2():
return 'second task'
@task
def task_3():
return 'third task'
@task
def task_4():
return 'forth task'
default_args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
@dag(dag_id='taskflow_Whosebug', schedule_interval='@once', default_args=default_args, catchup=False)
def my_dag():
op_1 = task_1()
op_2 = task_2()
op_3 = task_3()
op_4 = task_4()
op_1 >> [op_2, op_3]
[op_2, op_3] >> op_4
dag = my_dag()
可能您需要混合使用两种语法选项,具体取决于您想要实现的目标。