在循环之后如何为一些独立的静态任务提供任务依赖性。空气流动

after a loop how to give task dependencies for some independent static tasks. Airflow

我有一个 for 循环,我还有一些中间任务和循环后的一些任务。

我只在 for 循环中提供任务依赖性,正如很多帖子中提到的那样:

示例:

individual_task1 = SSHOperator (task_id='tk_one'....)

individual_task2 = SSHOperator (task_id='tk_two'....)

individual_task3 = SSHOperator (task_id='tk_three'....)

  for i in [val1,val2,val3,val4.....valn]

    first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)

    second_task_in_loop = SSHOperator(task_id='stats_' + i...)

    individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2 >> individual_task3

但是对于 individual_task2 和 individual_task3 我得到错误:

Broken Dag ,, task_id already registered .

但这是一个未在循环中定义的单独任务,那么为什么我会收到此错误或我做错了什么?

试试这个:

individual_task1 = SSHOperator (task_id='tk_one'....)

individual_task2 = SSHOperator (task_id='tk_two'....)

individual_task3 = SSHOperator (task_id='tk_three'....)

  for i in [val1,val2,val3,val4.....valn]

    first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)

    second_task_in_loop = SSHOperator(task_id='stats_' + i...)

    individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2

individual_task2 >> individual_task3

可能 Airflow 抱怨是因为您多次设置相同的任务流