如何获得 apache 气流以使用 Jinja 渲染 Hive HQL ${variables}

How to get apache airflow to render Hive HQL ${variables} with Jinja

这似乎在您传入带有 ${xxx} 变量的 HQL 脚本时得到支持,并且在实际执行模板渲染的阶段之前将其预处理以将它们转换为 {{xxx}} Jinja 样式然后用用户提供的字典中的值替换它们。我相信这是因为 HiveOperator class:

中有这样的函数
def prepare_template(self):
    if self.hiveconf_jinja_translate:
        self.hql = re.sub(
            "($\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql)
    if self.script_begin_tag and self.script_begin_tag in self.hql:
        self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])

问题是我不知道如何触发这段代码在模板渲染阶段之前被调用。我有一个像这样的基本 DAG 脚本:

from airflow import DAG
from airflow.operators.hive_operator import HiveOperator
from datetime import datetime, timedelta

default_args = dict(
    owner='mpetronic',
    depends_on_past=False,
    start_date=datetime(2017, 5, 2),
    verbose=True,
    retries=1,
    retry_delay=timedelta(minutes=5)
    )

dag = DAG(
    dag_id='report',
    schedule_interval='* * * * *',
    user_defined_macros=dict(a=1, b=2),
    default_args=default_args)

hql = open('/home/mpetronic/repos/airflow/resources/hql/report.hql').read()

task = HiveOperator(
    task_id='report_builder',
    hive_cli_conn_id='hive_dv',
    schema='default',
    mapred_job_name='report_builder',
    hiveconf_jinja_translate=True,
    dag=dag,
    hql=hql)

我可以看到我的 user_defined_macros 字典在代码中与全局 jinja 上下文字典合并,然后应用于我的 HQL 脚本以将其呈现为模板。但是,因为我的 HQL 是本机 HQL,所以我要更新的所有变量都是 ${xxx} 的形式,jinja 会跳过它们。我需要气流先调用 prepare_template() 但不知道如何实现。

我意识到我可以手动将我的 HQL ${xxx} 更改为 {{xxx}},因为这可行,但这似乎是一种反模式。我希望脚本能够在本机或通过气流运行。这是 TaskInstance class 中的函数,它会呈现我手动更改的 {{xxx}} 值:

def render_templates(self):
    task = self.task
    jinja_context = self.get_template_context()
    if hasattr(self, 'task') and hasattr(self.task, 'dag'):
        if self.task.dag.user_defined_macros:
            jinja_context.update(
                self.task.dag.user_defined_macros)

    rt = self.task.render_template  # shortcut to method
    for attr in task.__class__.template_fields:
        content = getattr(task, attr)
        if content:
            rendered_content = rt(attr, content, jinja_context)
            setattr(task, attr, rendered_content)

我解决了我的问题。这是上述方法中使用的正则表达式:

($\{([ a-zA-Z0-9_]*)\}) 

它不考虑以下形式的直线变量:

${hivevar:var_name} 

它不考虑模式中的冒号。该形式是使用直线在命名空间内定义 Hive 变量的更标准方式。要使这个 Jinja 替换起作用,您必须仅使用 ${var_name} 在 HQL 中引用变量,但您只能使用

在直线中定义变量:
set hivevar:var_name=123;

我认为 Airflow 应该完全支持 hivevar:var_name 风格的命名空间变量,当你 运行 直线时,直线是与 Hive 一起使用的首选客户端。