在循环之后如何为一些独立的静态任务提供任务依赖性。空气流动
after a loop how to give task dependencies for some independent static tasks. Airflow
我有一个 for 循环,我还有一些中间任务和循环后的一些任务。
我只在 for 循环中提供任务依赖性,正如很多帖子中提到的那样:
示例:
individual_task1 = SSHOperator (task_id='tk_one'....)
individual_task2 = SSHOperator (task_id='tk_two'....)
individual_task3 = SSHOperator (task_id='tk_three'....)
for i in [val1,val2,val3,val4.....valn]
first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)
second_task_in_loop = SSHOperator(task_id='stats_' + i...)
individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2 >> individual_task3
但是对于 individual_task2 和 individual_task3 我得到错误:
Broken Dag ,, task_id already registered .
但这是一个未在循环中定义的单独任务,那么为什么我会收到此错误或我做错了什么?
试试这个:
individual_task1 = SSHOperator (task_id='tk_one'....)
individual_task2 = SSHOperator (task_id='tk_two'....)
individual_task3 = SSHOperator (task_id='tk_three'....)
for i in [val1,val2,val3,val4.....valn]
first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)
second_task_in_loop = SSHOperator(task_id='stats_' + i...)
individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2
individual_task2 >> individual_task3
可能 Airflow 抱怨是因为您多次设置相同的任务流
我有一个 for 循环,我还有一些中间任务和循环后的一些任务。
我只在 for 循环中提供任务依赖性,正如很多帖子中提到的那样:
示例:
individual_task1 = SSHOperator (task_id='tk_one'....)
individual_task2 = SSHOperator (task_id='tk_two'....)
individual_task3 = SSHOperator (task_id='tk_three'....)
for i in [val1,val2,val3,val4.....valn]
first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)
second_task_in_loop = SSHOperator(task_id='stats_' + i...)
individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2 >> individual_task3
但是对于 individual_task2 和 individual_task3 我得到错误:
Broken Dag ,, task_id already registered .
但这是一个未在循环中定义的单独任务,那么为什么我会收到此错误或我做错了什么?
试试这个:
individual_task1 = SSHOperator (task_id='tk_one'....)
individual_task2 = SSHOperator (task_id='tk_two'....)
individual_task3 = SSHOperator (task_id='tk_three'....)
for i in [val1,val2,val3,val4.....valn]
first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)
second_task_in_loop = SSHOperator(task_id='stats_' + i...)
individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2
individual_task2 >> individual_task3
可能 Airflow 抱怨是因为您多次设置相同的任务流