在 BranchPython Operator 之后跳过 Airflow 2.0 任务
Airflow 2.0 task getting skipped after BranchPython Operator
我在新版本的 Airflow 中摆弄分支,无论我尝试什么,都会跳过 BranchOperator 之后的所有任务。
这是我一直在努力完成的最简单的例子
from airflow.decorators import dag, task
from datetime import timedelta, datetime
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
import logging
logger = logging.getLogger("airflow.task")
@dag(
schedule_interval="0 0 * * *",
start_date=datetime.today() - timedelta(days=2),
dagrun_timeout=timedelta(minutes=60),
)
def WhosebugExample():
@task
def task_A():
logging.info("TASK A")
@task
def task_B():
logging.info("TASK B")
@task
def task_C():
logging.info("TASK C")
@task
def task_D():
logging.info("TASK D")
return {"parameter":0.5}
def _choose_task(task_parameters,**kwargs):
logging.info(task_parameters["parameter"])
if task_parameters["parameter"]<0.5:
logging.info("SUCCESSS ")
return ['branch_1', 'task_final']
else:
logging.info("RIP")
return ['branch_2', 'task_final']
@task(task_id="branch_1")
def branch_1():
logging.info("branch_1...")
@task(task_id="branch_2")
def branch_2():
logging.info("branch_2")
@task(task_id="task_final")
def task_final():
logging.info("task_final")
parameter = task_A() >> task_B() >> task_C() >> task_D()
choose_task = BranchPythonOperator(
task_id='choose_best_model',
op_kwargs={"task_parameters":parameter},
python_callable=_choose_task,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)
choose_task >> [branch_1(), branch_2()] >> task_final()
dag = WhosebugExample ()
有线索吗?我怀疑触发规则。我是 Airflow 的初学者,所以我不会放弃我忽略的任何其他问题
您应该在 task_final
上设置触发规则。
您希望 task_final
在 branch_1
和 branch_2
完成执行后执行(无论其中一个是 executed/skipped),因此您需要设置 all done 触发规则:
@task(task_id="task_final", trigger_rule=TriggerRule.ALL_DONE)
def task_final():
logging.info("task_final")
我在新版本的 Airflow 中摆弄分支,无论我尝试什么,都会跳过 BranchOperator 之后的所有任务。
这是我一直在努力完成的最简单的例子
from airflow.decorators import dag, task
from datetime import timedelta, datetime
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
import logging
logger = logging.getLogger("airflow.task")
@dag(
schedule_interval="0 0 * * *",
start_date=datetime.today() - timedelta(days=2),
dagrun_timeout=timedelta(minutes=60),
)
def WhosebugExample():
@task
def task_A():
logging.info("TASK A")
@task
def task_B():
logging.info("TASK B")
@task
def task_C():
logging.info("TASK C")
@task
def task_D():
logging.info("TASK D")
return {"parameter":0.5}
def _choose_task(task_parameters,**kwargs):
logging.info(task_parameters["parameter"])
if task_parameters["parameter"]<0.5:
logging.info("SUCCESSS ")
return ['branch_1', 'task_final']
else:
logging.info("RIP")
return ['branch_2', 'task_final']
@task(task_id="branch_1")
def branch_1():
logging.info("branch_1...")
@task(task_id="branch_2")
def branch_2():
logging.info("branch_2")
@task(task_id="task_final")
def task_final():
logging.info("task_final")
parameter = task_A() >> task_B() >> task_C() >> task_D()
choose_task = BranchPythonOperator(
task_id='choose_best_model',
op_kwargs={"task_parameters":parameter},
python_callable=_choose_task,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)
choose_task >> [branch_1(), branch_2()] >> task_final()
dag = WhosebugExample ()
有线索吗?我怀疑触发规则。我是 Airflow 的初学者,所以我不会放弃我忽略的任何其他问题
您应该在 task_final
上设置触发规则。
您希望 task_final
在 branch_1
和 branch_2
完成执行后执行(无论其中一个是 executed/skipped),因此您需要设置 all done 触发规则:
@task(task_id="task_final", trigger_rule=TriggerRule.ALL_DONE)
def task_final():
logging.info("task_final")