Airflow 2.0.0+ - 将动态生成的字典传递给由 TriggerDagRunOperator 触发的 DAG
Airflow 2.0.0+ - Pass a Dynamically Generated Dictionary to DAG Triggered by TriggerDagRunOperator
之前,我使用 TriggerDagRunOperator 的 python_callable 参数来 动态改变 dag_run_obj 传递给新触发的 DAG 的 payload.
自从它在 Airflow 2.0.0 中被移除(Pull Req:https://github.com/apache/airflow/pull/6317),有没有办法做到这一点,而无需创建自定义 TriggerDagRunOperator?
对于上下文,这是我的代码流程:
#Poll Amazon S3 bucket for new received files
fileSensor_tsk = S3KeySensor()
#Use chooseDAGBasedOnInput function to create dag_run object (previously python_callable was used directly in TriggerDagRunOperator to create the dag_run object for the new triggered DAG)
#dag_run object will pass received file name details to new DAG for reference in order to complete its own work
chooseDAGTrigger_tsk = BranchPythonOperator(
task_id='chooseDAGTrigger_tsk',
python_callable=chooseDAGBasedOnInput,
provide_context=True
)
triggerNewDAG_tsk = TriggerDagRunOperator(
task_id='triggerNewDAG_tsk',
trigger_dag_id='1000_NEW_LOAD'
)
triggerNewDAG2_tsk = TriggerDagRunOperator(
task_id='triggerNew2DAG_tsk',
trigger_dag_id='1000_NEW2_LOAD'
) ...
如有任何帮助或评论,我们将不胜感激!
EDIT - 添加以前在 TriggerDagRunOperator 中使用的 python_callable 函数:
def intakeFile(context, dag_run_obj):
#read from S3, get filename and pass to triggered DAG
bucket_name = os.environ.get('bucket_name')
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_hook.copy_object()
s3_hook.delete_objects()
...
dag_run_obj.payload = {
'filePath': workingPath,
'source': source,
'fileName': fileName
}
return dag_run_obj
TriggerDagRunOperator now takes a conf
parameter to which a dictinoary can be provided as the conf object for the DagRun. Here 是有关触发 DAG 的更多信息,您可能会发现它们也很有用。
编辑
由于您需要执行一个函数来确定触发哪个 DAG,并且不想创建自定义 TriggerDagRunOperator
,您可以执行 intakeFile()
在 PythonOperator
中(或将 @task
装饰器与 Task Flow API 一起使用)并使用 return 值作为 TriggerDagRunOperator
中的 conf
参数。作为 Airflow 2.0 的一部分,return 值会自动推送到许多操作符中的 XCom; PythonOperator
包括在内。
大致思路如下:
def intakeFile(*args, **kwargs):
# read from S3, get filename and pass to triggered DAG
bucket_name = os.environ.get("bucket_name")
s3_hook = S3Hook(aws_conn_id="aws_default")
s3_hook.copy_object()
s3_hook.delete_objects()
...
dag_run_obj.payload = {
"filePath": workingPath,
"source": source,
"fileName": fileName,
}
return dag_run_obj
get_dag_to_trigger = PythonOperator(
task_id="get_dag_to_trigger",
python_callable=intakeFile
)
triggerNewDAG_tsk = TriggerDagRunOperator(
task_id="triggerNewDAG_tsk",
trigger_dag_id="{{ ti.xcom_pull(task_ids='get_dag_to_trigger', key='return_value') }}",
)
get_dag_to_trigger >> triggerNewDAG_tsk
之前,我使用 TriggerDagRunOperator 的 python_callable 参数来 动态改变 dag_run_obj 传递给新触发的 DAG 的 payload.
自从它在 Airflow 2.0.0 中被移除(Pull Req:https://github.com/apache/airflow/pull/6317),有没有办法做到这一点,而无需创建自定义 TriggerDagRunOperator?
对于上下文,这是我的代码流程:
#Poll Amazon S3 bucket for new received files
fileSensor_tsk = S3KeySensor()
#Use chooseDAGBasedOnInput function to create dag_run object (previously python_callable was used directly in TriggerDagRunOperator to create the dag_run object for the new triggered DAG)
#dag_run object will pass received file name details to new DAG for reference in order to complete its own work
chooseDAGTrigger_tsk = BranchPythonOperator(
task_id='chooseDAGTrigger_tsk',
python_callable=chooseDAGBasedOnInput,
provide_context=True
)
triggerNewDAG_tsk = TriggerDagRunOperator(
task_id='triggerNewDAG_tsk',
trigger_dag_id='1000_NEW_LOAD'
)
triggerNewDAG2_tsk = TriggerDagRunOperator(
task_id='triggerNew2DAG_tsk',
trigger_dag_id='1000_NEW2_LOAD'
) ...
如有任何帮助或评论,我们将不胜感激!
EDIT - 添加以前在 TriggerDagRunOperator 中使用的 python_callable 函数:
def intakeFile(context, dag_run_obj):
#read from S3, get filename and pass to triggered DAG
bucket_name = os.environ.get('bucket_name')
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_hook.copy_object()
s3_hook.delete_objects()
...
dag_run_obj.payload = {
'filePath': workingPath,
'source': source,
'fileName': fileName
}
return dag_run_obj
TriggerDagRunOperator now takes a conf
parameter to which a dictinoary can be provided as the conf object for the DagRun. Here 是有关触发 DAG 的更多信息,您可能会发现它们也很有用。
编辑
由于您需要执行一个函数来确定触发哪个 DAG,并且不想创建自定义 TriggerDagRunOperator
,您可以执行 intakeFile()
在 PythonOperator
中(或将 @task
装饰器与 Task Flow API 一起使用)并使用 return 值作为 TriggerDagRunOperator
中的 conf
参数。作为 Airflow 2.0 的一部分,return 值会自动推送到许多操作符中的 XCom; PythonOperator
包括在内。
大致思路如下:
def intakeFile(*args, **kwargs):
# read from S3, get filename and pass to triggered DAG
bucket_name = os.environ.get("bucket_name")
s3_hook = S3Hook(aws_conn_id="aws_default")
s3_hook.copy_object()
s3_hook.delete_objects()
...
dag_run_obj.payload = {
"filePath": workingPath,
"source": source,
"fileName": fileName,
}
return dag_run_obj
get_dag_to_trigger = PythonOperator(
task_id="get_dag_to_trigger",
python_callable=intakeFile
)
triggerNewDAG_tsk = TriggerDagRunOperator(
task_id="triggerNewDAG_tsk",
trigger_dag_id="{{ ti.xcom_pull(task_ids='get_dag_to_trigger', key='return_value') }}",
)
get_dag_to_trigger >> triggerNewDAG_tsk