Apache Airflow 如何将 xcom_pull() 值转化为 DAG?
Apache Airflow How to xcom_pull() value into a DAG?
我有一个自定义运算符,它按如下方式推送 XCOM 值:
...
task_instance = context['task_instance']
task_instance.xcom_push("list_of_files",file_list)
...
它工作正常。我有一个 dag 定义文件 (my_dag.py),我在其中使用自己的运算符创建任务,它推送 XCOM 值,然后我想通过使用此 xcom 值在循环中执行。怎么拉?
您无法在 dag 中访问 XCOM 变量,它只能通过向运算符构造函数提供 provide_context=True
参数在运算符中使用。
如果您想在 DAG 结构本身中使用来自运算符的数据,则需要执行运算符在运算符之外执行的实际任务。
def get_file_list():
hook = SomeHook()
hook.run('something to get file list')
dag = DAG('tutorial', default_args=default_args)
for file in get_file_list():
task = SomeOperator(params={'file': file}) # Do something with the file passed as a parameter
从 dag 本身而不是从 dag 中的任务访问 xcom 通常是不好的做法。也就是说,有时这是必要的。例如动态创建dags时可能需要这样做
这是我在 dag 中提取一些未运行的作业的示例。我在 subdag 的上下文中使用它,所以我可以放心 xcom 将始终包含假设方法为 运行.
的信息
xcom_unrun_jobs = None
if len(parent_dag.get_active_runs()) > 0:
tis = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
xcom_unrun_jobs = tis.xcom_pull(dag_id=parent_dag._dag_id, task_ids=unrun_job_task_id)
我有一个自定义运算符,它按如下方式推送 XCOM 值:
...
task_instance = context['task_instance']
task_instance.xcom_push("list_of_files",file_list)
...
它工作正常。我有一个 dag 定义文件 (my_dag.py),我在其中使用自己的运算符创建任务,它推送 XCOM 值,然后我想通过使用此 xcom 值在循环中执行。怎么拉?
您无法在 dag 中访问 XCOM 变量,它只能通过向运算符构造函数提供 provide_context=True
参数在运算符中使用。
如果您想在 DAG 结构本身中使用来自运算符的数据,则需要执行运算符在运算符之外执行的实际任务。
def get_file_list():
hook = SomeHook()
hook.run('something to get file list')
dag = DAG('tutorial', default_args=default_args)
for file in get_file_list():
task = SomeOperator(params={'file': file}) # Do something with the file passed as a parameter
从 dag 本身而不是从 dag 中的任务访问 xcom 通常是不好的做法。也就是说,有时这是必要的。例如动态创建dags时可能需要这样做
这是我在 dag 中提取一些未运行的作业的示例。我在 subdag 的上下文中使用它,所以我可以放心 xcom 将始终包含假设方法为 运行.
的信息 xcom_unrun_jobs = None
if len(parent_dag.get_active_runs()) > 0:
tis = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
xcom_unrun_jobs = tis.xcom_pull(dag_id=parent_dag._dag_id, task_ids=unrun_job_task_id)