xcom_pull 在动态任务生成功能中不起作用
xcom_pull is not working in dynamic task generation function
我有一个动态创建子任务的函数,我在其中读取 xcom_pull
的值但出现错误:
File "/home/airflow/gcs/recon_nik_v6.py", line 168, in create_audit_task
my_dict=kwargs["ti"].xcom_pull(task_ids='accept_input_from_cli', key='my_ip_dict')
KeyError: 'ti'
如果我在另一个函数中使用相同的 my_dict=kwargs["ti"].xcom_pull(task_ids='accept_input_from_cli', key='my_ip_dict')
代码那么它可以工作,但是在这个动态部分它不工作。
S与您的其他问题类似(并在 slack 中多次解释)。这不是 Airflow 的工作方式。
XCom 拉取和任务实例仅在执行 DAG 运行 时可用。当您创建 DAG 结构(即动态生成 DAG)时,您不能使用它们。
只有执行任务时的任务实例才能访问它们,这已经是解析DAG并建立DAG结构之后的很长时间了。
所以你想做的事根本不可能。
我有一个动态创建子任务的函数,我在其中读取 xcom_pull
的值但出现错误:
File "/home/airflow/gcs/recon_nik_v6.py", line 168, in create_audit_task
my_dict=kwargs["ti"].xcom_pull(task_ids='accept_input_from_cli', key='my_ip_dict')
KeyError: 'ti'
如果我在另一个函数中使用相同的 my_dict=kwargs["ti"].xcom_pull(task_ids='accept_input_from_cli', key='my_ip_dict')
代码那么它可以工作,但是在这个动态部分它不工作。
S与您的其他问题类似(并在 slack 中多次解释)。这不是 Airflow 的工作方式。
XCom 拉取和任务实例仅在执行 DAG 运行 时可用。当您创建 DAG 结构(即动态生成 DAG)时,您不能使用它们。
只有执行任务时的任务实例才能访问它们,这已经是解析DAG并建立DAG结构之后的很长时间了。
所以你想做的事根本不可能。