Airflow XCom - 如何使用 TriggerDagRunOperator 在 DAG 之间共享变量?

Airflow XCom - how to share vars between DAGs using TriggerDagRunOperator?

假设我有一个 DAG 进行一些数据处理,并将此处理的结果写入 dest_path 变量。 有没有办法使用 TriggerDagRunOperator

将此变量推送到另一个 DAG
def trigger(context, dag_run_obj):
    dest_path = context['ti'].xcom_pull(task_ids='download_data')
    return str(dest_path)

...
    trigger_next_dag = TriggerDagRunOperator(
        task_id="trigger_next_dag",
        trigger_dag_id="send_mined_data",  # Ensure this equals the dag_id of the DAG to trigger
        provide_context=True,
        python_callable=trigger,
        dag=dag,
    )

但是这个任务失败了 AttributeError: 'str' object has no attribute 'run_id'

第二个 DAG (dag_id="send_mined_data") 应该以常见的方式提取此变量:

    ti = kwargs['ti']
    pulled_string = ti.xcom_pull(task_ids='trigger_next_dag')

根据源代码,需要针对您的用例扩展 TriggerDagRunOperator。您会看到 source code here.

您需要做的是继承此运算符并通过在调用 trigger_dag 函数之前将 trigger 函数的代码注入 execute 方法中来扩展它打电话。