如何在气流 2.1.4 中创建 user_defined_macro

How to create a user_defined_macro in airflow 2.1.4

我正在使用 Airflow 2.1.4(Composer)并基于以下功能,我想要一个宏 returns BigQuery 中 table 的名称,即。 project_id.dataset_name.tablename

def table_format(datasetname, tablename, use_grave=True):
    """
    Generate table name using env variables
    datasetname: name of datas set for BigQuery
    tablename: name of table
    use_grave: Boolean to use grave accent at BigQuery table
    """
    # Environment, we have local, prod, stage
    ds_env = "{{ var.value.ENV }}"

    # BigQuery Project Name
    bq_prj = "{{ var.value.BQ_PROJECT }}"

    if ds_env != "prod" or ds_env != "staging":
        ds_txt = ds_env + "_"
    else:
        ds_txt = None

    tbl_name = bq_prj + "." + datasetname + "." + ds_txt + tablename

    if use_grave is True:
        tbl_name = "`" + tbl_name + "`"
    return tbl_name

当我把它用作

from dag_utils.airflow_utils import table_format

with DAG(
    "crm_data_pipeline",
    schedule_interval="0 1 * * *",
    max_active_runs=1,
    concurrency=2,
    user_defined_macros={"table_name": table_format},
   ....

    load_crm_data_interaccions = BigQueryOperator(
        task_id="load_crm_data_interaccions",
        dag=dag,
        use_legacy_sql=False,
        allow_large_results=True,
        destination_dataset_table="{{table_name(data_crm,intereactions,False)}}",
        create_disposition="CREATE_IF_NEEDED",
        write_disposition="WRITE_TRUNCATE",
        sql="./sql/data_crm_interactions.sql",
    )


....

我收到以下错误。奇怪的是模板的渲染效果很好

[2022-01-21 19:21:07,005] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1248, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1761, in render_templates
    self.task.render_template_fields(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 997, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1010, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.from_string(content).render(**context)
  File "/opt/python3.8/lib/python3.8/site-packages/jinja2/nativetypes.py", line 91, in render
    return self.environment.handle_exception()
  File "/opt/python3.8/lib/python3.8/site-packages/jinja2/environment.py", line 832, in handle_exception
    reraise(*rewrite_traceback_stack(source=source))
  File "/opt/python3.8/lib/python3.8/site-packages/jinja2/_compat.py", line 28, in reraise
    raise value.with_traceback(tb)
  File "/opt/python3.8/lib/python3.8/site-packages/jinja2/nativetypes.py", line 22, in native_concat
    head = list(islice(nodes, 2))
  File "<template>", line 1, in top-level template code
  File "/home/airflow/gcs/plugins/dag_utils/airflow_utils.py", line 26, in table_format
    ds_txt = ds_env + "_"
jinja2.exceptions.UndefinedError: 'data_crm' is undefined

有什么问题吗?感谢您的帮助!

感谢 Bas。我没有引用这些值。应该是

dataset_table="{{table_name('data_crm','intereactions',False)}}"