Airflow XCom - 如何使用 TriggerDagRunOperator 在 DAG 之间共享变量?
Airflow XCom - how to share vars between DAGs using TriggerDagRunOperator?
假设我有一个 DAG 进行一些数据处理,并将此处理的结果写入 dest_path
变量。
有没有办法使用 TriggerDagRunOperator
将此变量推送到另一个 DAG
def trigger(context, dag_run_obj):
dest_path = context['ti'].xcom_pull(task_ids='download_data')
return str(dest_path)
...
trigger_next_dag = TriggerDagRunOperator(
task_id="trigger_next_dag",
trigger_dag_id="send_mined_data", # Ensure this equals the dag_id of the DAG to trigger
provide_context=True,
python_callable=trigger,
dag=dag,
)
但是这个任务失败了
AttributeError: 'str' object has no attribute 'run_id'
第二个 DAG (dag_id="send_mined_data"
) 应该以常见的方式提取此变量:
ti = kwargs['ti']
pulled_string = ti.xcom_pull(task_ids='trigger_next_dag')
根据源代码,需要针对您的用例扩展 TriggerDagRunOperator。您会看到 source code here.
您需要做的是继承此运算符并通过在调用 trigger_dag
函数之前将 trigger
函数的代码注入 execute
方法中来扩展它打电话。
假设我有一个 DAG 进行一些数据处理,并将此处理的结果写入 dest_path
变量。
有没有办法使用 TriggerDagRunOperator
def trigger(context, dag_run_obj):
dest_path = context['ti'].xcom_pull(task_ids='download_data')
return str(dest_path)
...
trigger_next_dag = TriggerDagRunOperator(
task_id="trigger_next_dag",
trigger_dag_id="send_mined_data", # Ensure this equals the dag_id of the DAG to trigger
provide_context=True,
python_callable=trigger,
dag=dag,
)
但是这个任务失败了
AttributeError: 'str' object has no attribute 'run_id'
第二个 DAG (dag_id="send_mined_data"
) 应该以常见的方式提取此变量:
ti = kwargs['ti']
pulled_string = ti.xcom_pull(task_ids='trigger_next_dag')
根据源代码,需要针对您的用例扩展 TriggerDagRunOperator。您会看到 source code here.
您需要做的是继承此运算符并通过在调用 trigger_dag
函数之前将 trigger
函数的代码注入 execute
方法中来扩展它打电话。