如果任务 1 失败,如何在 运行 时间添加任务
How to add task at run time if task 1 is failed
如果任务 1 成功,我想执行任务 2 如果任务 1 失败,我想 运行 任务 3,如果需要,我想分配另一个流程。
基本上我想 运行 在没有 ssh 操作符的气流中执行条件任务。
from airflow import DAG
from airflow.operators import PythonOperator,BranchPythonOperator
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable
def t2_error_task(context):
instance = context['task_instance']
if instance.task_id == "performExtract":
print ("Please implement something over this")
task_3 = PythonOperator(
task_id='performJoin1',
python_callable=performJoin1, # maybe main?
dag = dag
)
dag.add_task(task_3)
with DAG(
'manageWorkFlow',
catchup=False,
default_args={
'owner': 'Mannu',
'start_date': datetime(2018, 4, 13),
'schedule_interval':None,
'depends_on_past': False,
},
) as dag:
task_1 = PythonOperator(
task_id='performExtract',
python_callable=performExtract,
on_failure_callback=t2_error_task,
depends_on_past=True
)
task_2 = PythonOperator(
task_id='printSchemas',
depends_on_past=True,
python_callable=printSchemaAll, # maybe main?
)
task_2.set_upstream(task_1)
Airflow 不支持基于执行时状态动态添加任务。为了获得所需的行为,您应该将 task_3
添加到您的 dag,但将其 trigger_rule
更改为 all_failed
。在这种情况下,任务将在 task_1
成功时被标记为已跳过,但在失败时将被执行。
如果任务 1 成功,我想执行任务 2 如果任务 1 失败,我想 运行 任务 3,如果需要,我想分配另一个流程。
基本上我想 运行 在没有 ssh 操作符的气流中执行条件任务。
from airflow import DAG
from airflow.operators import PythonOperator,BranchPythonOperator
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable
def t2_error_task(context):
instance = context['task_instance']
if instance.task_id == "performExtract":
print ("Please implement something over this")
task_3 = PythonOperator(
task_id='performJoin1',
python_callable=performJoin1, # maybe main?
dag = dag
)
dag.add_task(task_3)
with DAG(
'manageWorkFlow',
catchup=False,
default_args={
'owner': 'Mannu',
'start_date': datetime(2018, 4, 13),
'schedule_interval':None,
'depends_on_past': False,
},
) as dag:
task_1 = PythonOperator(
task_id='performExtract',
python_callable=performExtract,
on_failure_callback=t2_error_task,
depends_on_past=True
)
task_2 = PythonOperator(
task_id='printSchemas',
depends_on_past=True,
python_callable=printSchemaAll, # maybe main?
)
task_2.set_upstream(task_1)
Airflow 不支持基于执行时状态动态添加任务。为了获得所需的行为,您应该将 task_3
添加到您的 dag,但将其 trigger_rule
更改为 all_failed
。在这种情况下,任务将在 task_1
成功时被标记为已跳过,但在失败时将被执行。