在 Airflow 2 的 SubDagOperator 中通过 jinja 模板传递参数

Passing params through jinja templates in SubDagOperator in Airflow 2

Airflow 2:我已经从 taskA 推送了一个 xcom,并且正在将那个 xcom 拉到 subdag taskB 中。我一直无法拉出必要的 xcom。我想知道如何做到这一点。

我能够在 Airflow 1.10 中使用相同的代码提取必要的 xcom(另外我通过了 provide_context=True,在 Airflow 2 中被删除),但它在 Airflow 2 中失败并显示错误消息如:

jinja2.exceptions.UndefinedError: 'DAG_ID' is undefined

代码如下:

DAG_ID = 'test_dag_name' 
dag = DAG(
    DAG_ID,
    default_args=default_args, catchup=False,
    max_active_runs=1, schedule_interval=SCHEDULE_INTERVAL
)

with dag:
   test_bq = SubDagOperator(
    task_id='test_task_id',
    subdag=subdag_func(parent_dag_name=DAG_ID,
                                      child_dag_name='test_task_id',
                                      args=default_args,
                                      run_date=run_date,
                                      max_dt="{{ ti.xcom_pull(dag_id=DAG_ID ,task_ids='task_id_test')[0][0]}}")) 

注意: 如果在 xcom_pull 中传递确切的 dag 名称代替 DAG_ID,它工作正常。但是当传递参数(dag_id=DAG_ID)时,它失败了。

此外,如果 xcom_pull(dag_id=None) 因为 dag_id 在源代码 link 中是可选的,则会弹出新错误:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1248, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1761, in render_templates
    self.task.render_template_fields(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 997, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1010, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.from_string(content).render(**context)
  File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 1090, in render
    self.environment.handle_exception()
  File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 832, in handle_exception
    reraise(*rewrite_traceback_stack(source=source))
  File "/opt/python3.8/lib/python3.8/site-packages/jinja2/_compat.py", line 28, in reraise
    raise value.with_traceback(tb)
  File "<template>", line 68, in top-level template code
  File "/opt/python3.8/lib/python3.8/site-packages/jinja2/sandbox.py", line 384, in getitem
    return obj[argument]
jinja2.exceptions.UndefinedError: None has no element 0```

我可以通过以指定格式传递父 dag 名称来解决此问题: "{{ ti.xcom_pull(dag_id='" + DAG_ID + "',task_ids='task_id_test')[0][0] }}" 。 当传递 dag_id=DAG_ID 时,父 dag 名称未被访问,但当作为 dag_id='" + DAG_ID + "' 传递时,问题解决了。