Airflow - 根据条件停止 DAG(跳过分支后的剩余任务)
Airflow - Stop DAG based on condition (skip remaining tasks after branch)
我是airflow的新手,所以我在这里有疑问。
如果满足第一个任务的条件,我想要 运行 DAG。如果条件不满足,我想在第一个任务后停止 dag。
示例:
# first task
def get_number_func(**kwargs):
number = randint(0, 10)
print(number)
if (number >= 5):
print('A')
return 'continue_task'
else:
#STOP DAG
# second task if number is higher or equal 5
def continue_func(**kwargs):
print("The number is " + str(number))
# first task declaration
start_op = BranchPythonOperator(
task_id='get_number',
provide_context=True,
python_callable=get_number_func,
op_kwargs={},
dag=DAG,
)
# second task declaration
continue_op = PythonOperator(
task_id='continue_task',
provide_context=True,
python_callable=continue_func,
op_kwargs={},
dag=DAG,
)
start_op >> continue_op
我只运行第二个任务满足人数条件。如果条件未验证,则 DAG 不应 运行 第二个任务。
我该如何执行?我不想使用 xcom、全局变量或虚拟任务。
提前致谢!
你看过ShortCircuitOperator
了吗?此任务根据条件是 True 还是 False 来控制您的任务流程。如果条件为真,下游任务将继续。否则,将跳过所有下游任务。尝试将您的第一个任务更改为 ShortCircuitOperator
并将 get_number_func
函数更新为 return True 或 False。
这是我使用您的代码进行的测试:
from airflow.decorators import dag, task
from airflow.models import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from datetime import datetime
default_args = dict(
start_date=datetime(2021, 4, 26),
owner="me",
retries=0,
)
dag_args = dict(
dag_id="short_circuit",
schedule_interval=None,
default_args=default_args,
catchup=False,
)
def get_number_func(**kwargs):
from random import randint
number = randint(0, 10)
print(number)
if number >= 5:
print("A")
return True
else:
# STOP DAG
return False
def continue_func(**kwargs):
pass
with DAG(**dag_args) as dag:
# first task declaration
start_op = ShortCircuitOperator(
task_id="get_number",
provide_context=True,
python_callable=get_number_func,
op_kwargs={},
)
# second task declaration
continue_op = PythonOperator(
task_id="continue_task",
provide_context=True,
python_callable=continue_func,
op_kwargs={},
)
start_op >> continue_op
我是airflow的新手,所以我在这里有疑问。
如果满足第一个任务的条件,我想要 运行 DAG。如果条件不满足,我想在第一个任务后停止 dag。
示例:
# first task
def get_number_func(**kwargs):
number = randint(0, 10)
print(number)
if (number >= 5):
print('A')
return 'continue_task'
else:
#STOP DAG
# second task if number is higher or equal 5
def continue_func(**kwargs):
print("The number is " + str(number))
# first task declaration
start_op = BranchPythonOperator(
task_id='get_number',
provide_context=True,
python_callable=get_number_func,
op_kwargs={},
dag=DAG,
)
# second task declaration
continue_op = PythonOperator(
task_id='continue_task',
provide_context=True,
python_callable=continue_func,
op_kwargs={},
dag=DAG,
)
start_op >> continue_op
我只运行第二个任务满足人数条件。如果条件未验证,则 DAG 不应 运行 第二个任务。
我该如何执行?我不想使用 xcom、全局变量或虚拟任务。
提前致谢!
你看过ShortCircuitOperator
了吗?此任务根据条件是 True 还是 False 来控制您的任务流程。如果条件为真,下游任务将继续。否则,将跳过所有下游任务。尝试将您的第一个任务更改为 ShortCircuitOperator
并将 get_number_func
函数更新为 return True 或 False。
这是我使用您的代码进行的测试:
from airflow.decorators import dag, task
from airflow.models import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from datetime import datetime
default_args = dict(
start_date=datetime(2021, 4, 26),
owner="me",
retries=0,
)
dag_args = dict(
dag_id="short_circuit",
schedule_interval=None,
default_args=default_args,
catchup=False,
)
def get_number_func(**kwargs):
from random import randint
number = randint(0, 10)
print(number)
if number >= 5:
print("A")
return True
else:
# STOP DAG
return False
def continue_func(**kwargs):
pass
with DAG(**dag_args) as dag:
# first task declaration
start_op = ShortCircuitOperator(
task_id="get_number",
provide_context=True,
python_callable=get_number_func,
op_kwargs={},
)
# second task declaration
continue_op = PythonOperator(
task_id="continue_task",
provide_context=True,
python_callable=continue_func,
op_kwargs={},
)
start_op >> continue_op