在 Airflow 2 的 SubDagOperator 中通过 jinja 模板传递参数
Passing params through jinja templates in SubDagOperator in Airflow 2
Airflow 2:我已经从 taskA 推送了一个 xcom,并且正在将那个 xcom 拉到 subdag taskB 中。我一直无法拉出必要的 xcom。我想知道如何做到这一点。
我能够在 Airflow 1.10 中使用相同的代码提取必要的 xcom(另外我通过了 provide_context=True,在 Airflow 2 中被删除),但它在 Airflow 2 中失败并显示错误消息如:
jinja2.exceptions.UndefinedError: 'DAG_ID' is undefined
代码如下:
DAG_ID = 'test_dag_name'
dag = DAG(
DAG_ID,
default_args=default_args, catchup=False,
max_active_runs=1, schedule_interval=SCHEDULE_INTERVAL
)
with dag:
test_bq = SubDagOperator(
task_id='test_task_id',
subdag=subdag_func(parent_dag_name=DAG_ID,
child_dag_name='test_task_id',
args=default_args,
run_date=run_date,
max_dt="{{ ti.xcom_pull(dag_id=DAG_ID ,task_ids='task_id_test')[0][0]}}"))
注意: 如果在 xcom_pull 中传递确切的 dag 名称代替 DAG_ID,它工作正常。但是当传递参数(dag_id=DAG_ID)时,它失败了。
此外,如果 xcom_pull(dag_id=None) 因为 dag_id 在源代码 link 中是可选的,则会弹出新错误:
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1248, in _prepare_and_execute_task_with_callbacks
self.render_templates(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1761, in render_templates
self.task.render_template_fields(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 997, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1010, in _do_render_template_fields
rendered_content = self.render_template(content, context, jinja_env, seen_oids)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
return jinja_env.from_string(content).render(**context)
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 1090, in render
self.environment.handle_exception()
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 832, in handle_exception
reraise(*rewrite_traceback_stack(source=source))
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/_compat.py", line 28, in reraise
raise value.with_traceback(tb)
File "<template>", line 68, in top-level template code
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/sandbox.py", line 384, in getitem
return obj[argument]
jinja2.exceptions.UndefinedError: None has no element 0```
我可以通过以指定格式传递父 dag 名称来解决此问题:
"{{ ti.xcom_pull(dag_id='" + DAG_ID + "',task_ids='task_id_test')[0][0] }}"
。
当传递 dag_id=DAG_ID
时,父 dag 名称未被访问,但当作为 dag_id='" + DAG_ID + "'
传递时,问题解决了。
Airflow 2:我已经从 taskA 推送了一个 xcom,并且正在将那个 xcom 拉到 subdag taskB 中。我一直无法拉出必要的 xcom。我想知道如何做到这一点。
我能够在 Airflow 1.10 中使用相同的代码提取必要的 xcom(另外我通过了 provide_context=True,在 Airflow 2 中被删除),但它在 Airflow 2 中失败并显示错误消息如:
jinja2.exceptions.UndefinedError: 'DAG_ID' is undefined
代码如下:
DAG_ID = 'test_dag_name'
dag = DAG(
DAG_ID,
default_args=default_args, catchup=False,
max_active_runs=1, schedule_interval=SCHEDULE_INTERVAL
)
with dag:
test_bq = SubDagOperator(
task_id='test_task_id',
subdag=subdag_func(parent_dag_name=DAG_ID,
child_dag_name='test_task_id',
args=default_args,
run_date=run_date,
max_dt="{{ ti.xcom_pull(dag_id=DAG_ID ,task_ids='task_id_test')[0][0]}}"))
注意: 如果在 xcom_pull 中传递确切的 dag 名称代替 DAG_ID,它工作正常。但是当传递参数(dag_id=DAG_ID)时,它失败了。
此外,如果 xcom_pull(dag_id=None) 因为 dag_id 在源代码 link 中是可选的,则会弹出新错误:
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1248, in _prepare_and_execute_task_with_callbacks
self.render_templates(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1761, in render_templates
self.task.render_template_fields(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 997, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1010, in _do_render_template_fields
rendered_content = self.render_template(content, context, jinja_env, seen_oids)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
return jinja_env.from_string(content).render(**context)
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 1090, in render
self.environment.handle_exception()
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 832, in handle_exception
reraise(*rewrite_traceback_stack(source=source))
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/_compat.py", line 28, in reraise
raise value.with_traceback(tb)
File "<template>", line 68, in top-level template code
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/sandbox.py", line 384, in getitem
return obj[argument]
jinja2.exceptions.UndefinedError: None has no element 0```
我可以通过以指定格式传递父 dag 名称来解决此问题:
"{{ ti.xcom_pull(dag_id='" + DAG_ID + "',task_ids='task_id_test')[0][0] }}"
。
当传递 dag_id=DAG_ID
时,父 dag 名称未被访问,但当作为 dag_id='" + DAG_ID + "'
传递时,问题解决了。