为什么 'python_callable' 选项从 Airflow 2.0 中的 TriggerDagRunOperator 中移除?
Why was the 'python_callable' option removed from TriggerDagRunOperator in Airflow 2.0?
我可以看到,在 Airflow 2.0 之前,可以通过将 TriggerDagRunOperator 传递给它 python_callable
选项来使用带有条件的条件:
def foo(context, dag_run_obj):
if True:
return dag_run_obj
dag = DAG(dag_id='test_trigger_dag_run_for_Sid',
default_args={"owner" : "me",
"start_date":datetime.now()},
schedule_interval='*/1 * * * *')
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
trigger_dag_id="simple_dummy_dag_v1",
python_callable=foo,
dag=dag)
但是现在这个选项在最新版本的doc中已经消失了。
为什么 ?
如果没有它,我如何为触发器设置条件?
我不知道为什么会发生变化,但您可以使用 BranchPythonOperator
(解释得很好 here)。
你可以这样做:
# Function definition
def _branch_trigger_or_not():
condition_to_trigger = True # Change here to whatever you need
if condition_to_trigger:
return 'test_trigger_dagrun'
else:
return 'another_task_id'
### Other code here
# Inside DAG context
trigger_or_not = BranchPythonOperator(
task_id='trigger_or_not',
python_callable=_branch_trigger_or_not
)
# Dependencies definition
trigger_or_not >> [trigger, another_task]
我可以看到,在 Airflow 2.0 之前,可以通过将 TriggerDagRunOperator 传递给它 python_callable
选项来使用带有条件的条件:
def foo(context, dag_run_obj):
if True:
return dag_run_obj
dag = DAG(dag_id='test_trigger_dag_run_for_Sid',
default_args={"owner" : "me",
"start_date":datetime.now()},
schedule_interval='*/1 * * * *')
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
trigger_dag_id="simple_dummy_dag_v1",
python_callable=foo,
dag=dag)
但是现在这个选项在最新版本的doc中已经消失了。 为什么 ? 如果没有它,我如何为触发器设置条件?
我不知道为什么会发生变化,但您可以使用 BranchPythonOperator
(解释得很好 here)。
你可以这样做:
# Function definition
def _branch_trigger_or_not():
condition_to_trigger = True # Change here to whatever you need
if condition_to_trigger:
return 'test_trigger_dagrun'
else:
return 'another_task_id'
### Other code here
# Inside DAG context
trigger_or_not = BranchPythonOperator(
task_id='trigger_or_not',
python_callable=_branch_trigger_or_not
)
# Dependencies definition
trigger_or_not >> [trigger, another_task]