如何获得 apache 气流以使用 Jinja 渲染 Hive HQL ${variables}
How to get apache airflow to render Hive HQL ${variables} with Jinja
这似乎在您传入带有 ${xxx} 变量的 HQL 脚本时得到支持,并且在实际执行模板渲染的阶段之前将其预处理以将它们转换为 {{xxx}} Jinja 样式然后用用户提供的字典中的值替换它们。我相信这是因为 HiveOperator class:
中有这样的函数
def prepare_template(self):
if self.hiveconf_jinja_translate:
self.hql = re.sub(
"($\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql)
if self.script_begin_tag and self.script_begin_tag in self.hql:
self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
问题是我不知道如何触发这段代码在模板渲染阶段之前被调用。我有一个像这样的基本 DAG 脚本:
from airflow import DAG
from airflow.operators.hive_operator import HiveOperator
from datetime import datetime, timedelta
default_args = dict(
owner='mpetronic',
depends_on_past=False,
start_date=datetime(2017, 5, 2),
verbose=True,
retries=1,
retry_delay=timedelta(minutes=5)
)
dag = DAG(
dag_id='report',
schedule_interval='* * * * *',
user_defined_macros=dict(a=1, b=2),
default_args=default_args)
hql = open('/home/mpetronic/repos/airflow/resources/hql/report.hql').read()
task = HiveOperator(
task_id='report_builder',
hive_cli_conn_id='hive_dv',
schema='default',
mapred_job_name='report_builder',
hiveconf_jinja_translate=True,
dag=dag,
hql=hql)
我可以看到我的 user_defined_macros 字典在代码中与全局 jinja 上下文字典合并,然后应用于我的 HQL 脚本以将其呈现为模板。但是,因为我的 HQL 是本机 HQL,所以我要更新的所有变量都是 ${xxx} 的形式,jinja 会跳过它们。我需要气流先调用 prepare_template() 但不知道如何实现。
我意识到我可以手动将我的 HQL ${xxx} 更改为 {{xxx}},因为这可行,但这似乎是一种反模式。我希望脚本能够在本机或通过气流运行。这是 TaskInstance class 中的函数,它会呈现我手动更改的 {{xxx}} 值:
def render_templates(self):
task = self.task
jinja_context = self.get_template_context()
if hasattr(self, 'task') and hasattr(self.task, 'dag'):
if self.task.dag.user_defined_macros:
jinja_context.update(
self.task.dag.user_defined_macros)
rt = self.task.render_template # shortcut to method
for attr in task.__class__.template_fields:
content = getattr(task, attr)
if content:
rendered_content = rt(attr, content, jinja_context)
setattr(task, attr, rendered_content)
我解决了我的问题。这是上述方法中使用的正则表达式:
($\{([ a-zA-Z0-9_]*)\})
它不考虑以下形式的直线变量:
${hivevar:var_name}
它不考虑模式中的冒号。该形式是使用直线在命名空间内定义 Hive 变量的更标准方式。要使这个 Jinja 替换起作用,您必须仅使用 ${var_name}
在 HQL 中引用变量,但您只能使用
在直线中定义变量:
set hivevar:var_name=123;
我认为 Airflow 应该完全支持 hivevar:var_name
风格的命名空间变量,当你 运行 直线时,直线是与 Hive 一起使用的首选客户端。
这似乎在您传入带有 ${xxx} 变量的 HQL 脚本时得到支持,并且在实际执行模板渲染的阶段之前将其预处理以将它们转换为 {{xxx}} Jinja 样式然后用用户提供的字典中的值替换它们。我相信这是因为 HiveOperator class:
中有这样的函数def prepare_template(self):
if self.hiveconf_jinja_translate:
self.hql = re.sub(
"($\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql)
if self.script_begin_tag and self.script_begin_tag in self.hql:
self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
问题是我不知道如何触发这段代码在模板渲染阶段之前被调用。我有一个像这样的基本 DAG 脚本:
from airflow import DAG
from airflow.operators.hive_operator import HiveOperator
from datetime import datetime, timedelta
default_args = dict(
owner='mpetronic',
depends_on_past=False,
start_date=datetime(2017, 5, 2),
verbose=True,
retries=1,
retry_delay=timedelta(minutes=5)
)
dag = DAG(
dag_id='report',
schedule_interval='* * * * *',
user_defined_macros=dict(a=1, b=2),
default_args=default_args)
hql = open('/home/mpetronic/repos/airflow/resources/hql/report.hql').read()
task = HiveOperator(
task_id='report_builder',
hive_cli_conn_id='hive_dv',
schema='default',
mapred_job_name='report_builder',
hiveconf_jinja_translate=True,
dag=dag,
hql=hql)
我可以看到我的 user_defined_macros 字典在代码中与全局 jinja 上下文字典合并,然后应用于我的 HQL 脚本以将其呈现为模板。但是,因为我的 HQL 是本机 HQL,所以我要更新的所有变量都是 ${xxx} 的形式,jinja 会跳过它们。我需要气流先调用 prepare_template() 但不知道如何实现。
我意识到我可以手动将我的 HQL ${xxx} 更改为 {{xxx}},因为这可行,但这似乎是一种反模式。我希望脚本能够在本机或通过气流运行。这是 TaskInstance class 中的函数,它会呈现我手动更改的 {{xxx}} 值:
def render_templates(self):
task = self.task
jinja_context = self.get_template_context()
if hasattr(self, 'task') and hasattr(self.task, 'dag'):
if self.task.dag.user_defined_macros:
jinja_context.update(
self.task.dag.user_defined_macros)
rt = self.task.render_template # shortcut to method
for attr in task.__class__.template_fields:
content = getattr(task, attr)
if content:
rendered_content = rt(attr, content, jinja_context)
setattr(task, attr, rendered_content)
我解决了我的问题。这是上述方法中使用的正则表达式:
($\{([ a-zA-Z0-9_]*)\})
它不考虑以下形式的直线变量:
${hivevar:var_name}
它不考虑模式中的冒号。该形式是使用直线在命名空间内定义 Hive 变量的更标准方式。要使这个 Jinja 替换起作用,您必须仅使用 ${var_name}
在 HQL 中引用变量,但您只能使用
set hivevar:var_name=123;
我认为 Airflow 应该完全支持 hivevar:var_name
风格的命名空间变量,当你 运行 直线时,直线是与 Hive 一起使用的首选客户端。