在气流中组织 DAGs 任务依赖关系
Organize DAGs tasks dependencies in airflow
我有一个虚拟任务和一个 运行 并行使用循环的任务列表,对于一个任务,我想再执行一个串行任务。
到目前为止我尝试过的是:
dummy = DummyOperator(task_id='Dummy'.upper(),
dag=dag)
final = DummyOperator(task_id='FinalTask'.upper(),
dag=dag)
for task in ['Task1', 'Task2', 'Task3']:
if task == 'Task1'
task1 = DummyOperator(task_id='Task1-a'.upper(),
dag=dag)
else:
...
else:
...
tasks = DummyOperator(task_id=task),
dag=dag)
dummy >> tasks
tasks >> task1
tasks >> final
你没有解释我们如何知道 Task1 的子任务是什么的逻辑。
这应该构建您想要的结构:
tasks = ['Task1', 'Task2', 'Task3']
default_args = {
'owner': 'airflow',
'start_date': datetime(2020, 12, 17),
}
with DAG(
dag_id='dummyplay2',
default_args=default_args,
schedule_interval=None,
) as dag:
start_op = DummyOperator(task_id='start')
final_op = DummyOperator(task_id='final')
for task in tasks:
task_op = DummyOperator(task_id=task)
start_op >> task_op
if task == 'Task1':
#This loop creates the sub task logic.
#You can replace ord('b') with ord('z) and it will create more sub tasks
for i in range(ord('a'), ord('b')+1):
sub_task_op = DummyOperator(task_id=f'{task}_{chr(i)}')
task_op >> sub_task_op >> final_op
else:
task_op >> final_op
DAG 将是:
我有一个虚拟任务和一个 运行 并行使用循环的任务列表,对于一个任务,我想再执行一个串行任务。
到目前为止我尝试过的是:
dummy = DummyOperator(task_id='Dummy'.upper(),
dag=dag)
final = DummyOperator(task_id='FinalTask'.upper(),
dag=dag)
for task in ['Task1', 'Task2', 'Task3']:
if task == 'Task1'
task1 = DummyOperator(task_id='Task1-a'.upper(),
dag=dag)
else:
...
else:
...
tasks = DummyOperator(task_id=task),
dag=dag)
dummy >> tasks
tasks >> task1
tasks >> final
你没有解释我们如何知道 Task1 的子任务是什么的逻辑。
这应该构建您想要的结构:
tasks = ['Task1', 'Task2', 'Task3']
default_args = {
'owner': 'airflow',
'start_date': datetime(2020, 12, 17),
}
with DAG(
dag_id='dummyplay2',
default_args=default_args,
schedule_interval=None,
) as dag:
start_op = DummyOperator(task_id='start')
final_op = DummyOperator(task_id='final')
for task in tasks:
task_op = DummyOperator(task_id=task)
start_op >> task_op
if task == 'Task1':
#This loop creates the sub task logic.
#You can replace ord('b') with ord('z) and it will create more sub tasks
for i in range(ord('a'), ord('b')+1):
sub_task_op = DummyOperator(task_id=f'{task}_{chr(i)}')
task_op >> sub_task_op >> final_op
else:
task_op >> final_op
DAG 将是: