在两个 DAG 之间设置上游
Set upstream between two DAGs
我有两个 python 文件(proc1.py 和 proc2.py)调用一些 BaseOperator 来做一些处理。工作流程是这样的(为便于阅读而简化):
proc1:
calculate1->calcualte2->end
def generate_proc1_dag(dag):
run_this = BaseOperator()
cal1 = Calculate1()
cal1.set_upstream(run_this)
cal2 = calcualte2()
cal2.set_upstream(cal1)
end = BaseOperator()
end.set_upstream(cal2)
proc1_dag = DAG(dag_id='proc1', default_args=ARGS, schedule_interval=None)
generate_proc1_dag(proc1_dag)
proc2 (similar to proc1):
calcualteA->calcualteB->end
proc2.py 中的操作应该在 proc1 完成后开始。结果,我尝试创建一个新的 DAG (run_all.py),我尝试了这个:
def generate_run_all_dag(dag):
run_this = BaseOperator()
global proc1_dag
global proc2_dag
generate_proc1_dag(proc1_dag)
proc1_dag.set_upstream(run_this)
generate_proc2_dag(proc2_dag)
proc2_dag.set_upstream(proc1_dag)
end = BaseOperator()
end.set_upstream(proc2_dag)
run_all_dag = DAG(...)
generate_run_all_dag(run_all_dag)
首先,我不是 100% 确定我可以像在 run_all.py 中那样使用 set_upstream,但是,我不知道任何其他方式来提及 proc2 应该在之后启动proc1。
现在,当我使用它时,出现以下异常:airflow.exceptions.AirflowException:试图在还没有 DAG 的任务之间创建关系。为至少一项任务设置 DAG,然后重试
如有任何帮助,我们将不胜感激。
注意:proc1 和 proc2 独立工作,(如果可能)不应对它们进行任何更改。
您应该为此使用 "ExternalTaskSensor"。所以你的 proc2.py 将包含这样的内容:
sensor = ExternalTaskSensor (
task_id='proc2_sensor_task',
external_dag_id='proc1',
external_task_id='proc1_task_id',
dag=dag
)
proc_2_task.set_upstream(sensor)
但是,要使其正常工作,您需要在 proc1.py 中定义 'proc1_task_id' 并在 proc2.py 中定义 proc_2_task。
我建议您查看此处的教程,了解如何完成此操作的说明:https://airflow.incubator.apache.org/tutorial.html
我有两个 python 文件(proc1.py 和 proc2.py)调用一些 BaseOperator 来做一些处理。工作流程是这样的(为便于阅读而简化):
proc1:
calculate1->calcualte2->end
def generate_proc1_dag(dag):
run_this = BaseOperator()
cal1 = Calculate1()
cal1.set_upstream(run_this)
cal2 = calcualte2()
cal2.set_upstream(cal1)
end = BaseOperator()
end.set_upstream(cal2)
proc1_dag = DAG(dag_id='proc1', default_args=ARGS, schedule_interval=None)
generate_proc1_dag(proc1_dag)
proc2 (similar to proc1):
calcualteA->calcualteB->end
proc2.py 中的操作应该在 proc1 完成后开始。结果,我尝试创建一个新的 DAG (run_all.py),我尝试了这个:
def generate_run_all_dag(dag):
run_this = BaseOperator()
global proc1_dag
global proc2_dag
generate_proc1_dag(proc1_dag)
proc1_dag.set_upstream(run_this)
generate_proc2_dag(proc2_dag)
proc2_dag.set_upstream(proc1_dag)
end = BaseOperator()
end.set_upstream(proc2_dag)
run_all_dag = DAG(...)
generate_run_all_dag(run_all_dag)
首先,我不是 100% 确定我可以像在 run_all.py 中那样使用 set_upstream,但是,我不知道任何其他方式来提及 proc2 应该在之后启动proc1。
现在,当我使用它时,出现以下异常:airflow.exceptions.AirflowException:试图在还没有 DAG 的任务之间创建关系。为至少一项任务设置 DAG,然后重试
如有任何帮助,我们将不胜感激。 注意:proc1 和 proc2 独立工作,(如果可能)不应对它们进行任何更改。
您应该为此使用 "ExternalTaskSensor"。所以你的 proc2.py 将包含这样的内容:
sensor = ExternalTaskSensor (
task_id='proc2_sensor_task',
external_dag_id='proc1',
external_task_id='proc1_task_id',
dag=dag
)
proc_2_task.set_upstream(sensor)
但是,要使其正常工作,您需要在 proc1.py 中定义 'proc1_task_id' 并在 proc2.py 中定义 proc_2_task。
我建议您查看此处的教程,了解如何完成此操作的说明:https://airflow.incubator.apache.org/tutorial.html