Airflow 2.0.0+ - 将动态生成的字典传递给由 TriggerDagRunOperator 触发的 DAG

Airflow 2.0.0+ - Pass a Dynamically Generated Dictionary to DAG Triggered by TriggerDagRunOperator

之前,我使用 TriggerDagRunOperatorpython_callable 参数来 动态改变 dag_run_obj 传递给新触发的 DAG 的 payload.

自从它在 Airflow 2.0.0 中被移除(Pull Req:https://github.com/apache/airflow/pull/6317),有没有办法做到这一点,而无需创建自定义 TriggerDagRunOperator?

对于上下文,这是我的代码流程:

#Poll Amazon S3 bucket for new received files
fileSensor_tsk = S3KeySensor()

#Use chooseDAGBasedOnInput function to create dag_run object (previously python_callable was used directly in TriggerDagRunOperator to create the dag_run object for the new triggered DAG)
#dag_run object will pass received file name details to new DAG for reference in order to complete its own work
chooseDAGTrigger_tsk = BranchPythonOperator(
    task_id='chooseDAGTrigger_tsk',
    python_callable=chooseDAGBasedOnInput,
    provide_context=True
)

triggerNewDAG_tsk = TriggerDagRunOperator(
    task_id='triggerNewDAG_tsk',
    trigger_dag_id='1000_NEW_LOAD'
)

triggerNewDAG2_tsk = TriggerDagRunOperator(
    task_id='triggerNew2DAG_tsk',
    trigger_dag_id='1000_NEW2_LOAD'
) ...

如有任何帮助或评论,我们将不胜感激!

EDIT - 添加以前在 TriggerDagRunOperator 中使用的 python_callable 函数:

def intakeFile(context, dag_run_obj):

    #read from S3, get filename and pass to triggered DAG
    bucket_name = os.environ.get('bucket_name')
    s3_hook = S3Hook(aws_conn_id='aws_default')
    s3_hook.copy_object()
    s3_hook.delete_objects()
    ...

    dag_run_obj.payload = {
        'filePath': workingPath,
        'source': source,
        'fileName': fileName
    }

    return dag_run_obj

TriggerDagRunOperator now takes a conf parameter to which a dictinoary can be provided as the conf object for the DagRun. Here 是有关触发 DAG 的更多信息,您可能会发现它们也很有用。

编辑

由于您需要执行一个函数来确定触发哪个 DAG,并且不想创建自定义 TriggerDagRunOperator,您可以执行 intakeFile()PythonOperator 中(或将 @task 装饰器与 Task Flow API 一起使用)并使用 return 值作为 TriggerDagRunOperator 中的 conf 参数。作为 Airflow 2.0 的一部分,return 值会自动推送到许多操作符中的 XCom; PythonOperator 包括在内。

大致思路如下:

def intakeFile(*args, **kwargs):

    # read from S3, get filename and pass to triggered DAG
    bucket_name = os.environ.get("bucket_name")
    s3_hook = S3Hook(aws_conn_id="aws_default")
    s3_hook.copy_object()
    s3_hook.delete_objects()
    ...

    dag_run_obj.payload = {
        "filePath": workingPath,
        "source": source,
        "fileName": fileName,
    }

    return dag_run_obj


get_dag_to_trigger = PythonOperator(
    task_id="get_dag_to_trigger",
    python_callable=intakeFile
)

triggerNewDAG_tsk = TriggerDagRunOperator(
    task_id="triggerNewDAG_tsk",
    trigger_dag_id="{{ ti.xcom_pull(task_ids='get_dag_to_trigger', key='return_value') }}",
)

get_dag_to_trigger >> triggerNewDAG_tsk