如何在气流 2.1.4 中创建 user_defined_macro
How to create a user_defined_macro in airflow 2.1.4
我正在使用 Airflow 2.1.4(Composer)并基于以下功能,我想要一个宏 returns BigQuery 中 table 的名称,即。 project_id.dataset_name.tablename
def table_format(datasetname, tablename, use_grave=True):
"""
Generate table name using env variables
datasetname: name of datas set for BigQuery
tablename: name of table
use_grave: Boolean to use grave accent at BigQuery table
"""
# Environment, we have local, prod, stage
ds_env = "{{ var.value.ENV }}"
# BigQuery Project Name
bq_prj = "{{ var.value.BQ_PROJECT }}"
if ds_env != "prod" or ds_env != "staging":
ds_txt = ds_env + "_"
else:
ds_txt = None
tbl_name = bq_prj + "." + datasetname + "." + ds_txt + tablename
if use_grave is True:
tbl_name = "`" + tbl_name + "`"
return tbl_name
当我把它用作
from dag_utils.airflow_utils import table_format
with DAG(
"crm_data_pipeline",
schedule_interval="0 1 * * *",
max_active_runs=1,
concurrency=2,
user_defined_macros={"table_name": table_format},
....
load_crm_data_interaccions = BigQueryOperator(
task_id="load_crm_data_interaccions",
dag=dag,
use_legacy_sql=False,
allow_large_results=True,
destination_dataset_table="{{table_name(data_crm,intereactions,False)}}",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
sql="./sql/data_crm_interactions.sql",
)
....
我收到以下错误。奇怪的是模板的渲染效果很好
[2022-01-21 19:21:07,005] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1248, in _prepare_and_execute_task_with_callbacks
self.render_templates(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1761, in render_templates
self.task.render_template_fields(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 997, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1010, in _do_render_template_fields
rendered_content = self.render_template(content, context, jinja_env, seen_oids)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
return jinja_env.from_string(content).render(**context)
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/nativetypes.py", line 91, in render
return self.environment.handle_exception()
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 832, in handle_exception
reraise(*rewrite_traceback_stack(source=source))
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/_compat.py", line 28, in reraise
raise value.with_traceback(tb)
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/nativetypes.py", line 22, in native_concat
head = list(islice(nodes, 2))
File "<template>", line 1, in top-level template code
File "/home/airflow/gcs/plugins/dag_utils/airflow_utils.py", line 26, in table_format
ds_txt = ds_env + "_"
jinja2.exceptions.UndefinedError: 'data_crm' is undefined
有什么问题吗?感谢您的帮助!
感谢 Bas。我没有引用这些值。应该是
dataset_table="{{table_name('data_crm','intereactions',False)}}"
我正在使用 Airflow 2.1.4(Composer)并基于以下功能,我想要一个宏 returns BigQuery 中 table 的名称,即。 project_id.dataset_name.tablename
def table_format(datasetname, tablename, use_grave=True):
"""
Generate table name using env variables
datasetname: name of datas set for BigQuery
tablename: name of table
use_grave: Boolean to use grave accent at BigQuery table
"""
# Environment, we have local, prod, stage
ds_env = "{{ var.value.ENV }}"
# BigQuery Project Name
bq_prj = "{{ var.value.BQ_PROJECT }}"
if ds_env != "prod" or ds_env != "staging":
ds_txt = ds_env + "_"
else:
ds_txt = None
tbl_name = bq_prj + "." + datasetname + "." + ds_txt + tablename
if use_grave is True:
tbl_name = "`" + tbl_name + "`"
return tbl_name
当我把它用作
from dag_utils.airflow_utils import table_format
with DAG(
"crm_data_pipeline",
schedule_interval="0 1 * * *",
max_active_runs=1,
concurrency=2,
user_defined_macros={"table_name": table_format},
....
load_crm_data_interaccions = BigQueryOperator(
task_id="load_crm_data_interaccions",
dag=dag,
use_legacy_sql=False,
allow_large_results=True,
destination_dataset_table="{{table_name(data_crm,intereactions,False)}}",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
sql="./sql/data_crm_interactions.sql",
)
....
我收到以下错误。奇怪的是模板的渲染效果很好
[2022-01-21 19:21:07,005] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1248, in _prepare_and_execute_task_with_callbacks
self.render_templates(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1761, in render_templates
self.task.render_template_fields(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 997, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1010, in _do_render_template_fields
rendered_content = self.render_template(content, context, jinja_env, seen_oids)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
return jinja_env.from_string(content).render(**context)
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/nativetypes.py", line 91, in render
return self.environment.handle_exception()
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 832, in handle_exception
reraise(*rewrite_traceback_stack(source=source))
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/_compat.py", line 28, in reraise
raise value.with_traceback(tb)
File "/opt/python3.8/lib/python3.8/site-packages/jinja2/nativetypes.py", line 22, in native_concat
head = list(islice(nodes, 2))
File "<template>", line 1, in top-level template code
File "/home/airflow/gcs/plugins/dag_utils/airflow_utils.py", line 26, in table_format
ds_txt = ds_env + "_"
jinja2.exceptions.UndefinedError: 'data_crm' is undefined
有什么问题吗?感谢您的帮助!
感谢 Bas。我没有引用这些值。应该是
dataset_table="{{table_name('data_crm','intereactions',False)}}"