将 Airflow 的 PostgresOperator 与 Jinja 模板和 SQL 一起使用时出现 TemplateNotFound
TemplateNotFound when using Airflow's PostgresOperator with Jinja templating and SQL
尝试将 Airflow 的模板功能(通过 Jinja2)与 PostgresOperator 结合使用时,我一直无法渲染。很可能我做错了什么,但我对问题可能是什么一无所知。这是重现我遇到的 TemplateNotFound 错误的示例:
airflow.cfg
airflow_home = /home/gregreda/airflow
dags_folder = /home/gregreda/airflow/dags
相关DAG和变量
default_args = {
'owner': 'gregreda',
'start_date': datetime(2016, 6, 1),
'schedule_interval': None,
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
this_dag_path = '/home/gregreda/airflow/dags/example_csv_to_redshift'
dag = DAG(
dag_id='example_csv_to_redshift',
schedule_interval=None,
default_args=default_args
)
/example_csv_to_redshift/csv_to_redshift.py
copy_s3_to_redshift = PostgresOperator(
task_id='load_table',
sql=this_dag_path + '/copy_to_redshift.sql',
params=dict(
AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
),
postgres_conn_id='postgres_redshift',
autocommit=False,
dag=dag
)
/example_csv_to_redshift/copy_to_redshift.sql
COPY public.table_foobar FROM 's3://mybucket/test-data/import/foobar.csv'
CREDENTIALS 'aws_access_key_id={{ AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;
调用airflow render example_csv_to_redshift load_table 2016-06-14
抛出下面的异常。请注意,对于另一个 DAG,我也 运行 关注此问题,这就是为什么您会看到提到 example_redshift_query_to_csv
的路径。
[2016-06-14 21:24:57,484] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-06-14 21:24:57,565] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2016-06-14 21:24:57,596] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2016-06-14 21:24:57,763] {models.py:154} INFO - Filling up the DagBag from /home/gregreda/airflow/dags
[2016-06-14 21:24:57,828] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files
setattr(self, attr, env.loader.get_source(env, content)[0])
File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
TemplateNotFound: /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql
[2016-06-14 21:24:57,834] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files
setattr(self, attr, env.loader.get_source(env, content)[0])
File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 15, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 359, in render
ti.render_templates()
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1409, in render_templates
rendered_content = rt(attr, content, jinja_context)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2017, in render_template
return jinja_env.get_template(content).render(**context)
File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 812, in get_template
return self._load_template(name, self.make_globals(globals))
File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 774, in _load_template
cache_key = self.loader.get_source(self, name)[1]
File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
非常感谢任何关于修复的想法。
标准 PEBCAK error.
在给定的 Airflow 任务中指定 SQL 模板的路径时出现问题,该路径需要是相对的。
copy_s3_to_redshift = PostgresOperator(
task_id='load_table',
sql='/copy_to_redshift.sql',
params=dict(
AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
),
postgres_conn_id='postgres_redshift',
autocommit=False,
dag=dag
)
此外,SQL 模板需要稍微更改一下(注意这次是 params. ...
):
COPY public.pitches FROM 's3://mybucket/test-data/import/heyward.csv'
CREDENTIALS 'aws_access_key_id={{ params.AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ params.AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;
要获得更多控制,请使用 template_searchpath
参数实例化您的 DAG,然后仅在运算符中使用文件名。
:param template_searchpath: This list of folders (non relative)
defines where jinja will look for your templates. Order matters.
Note that jinja/airflow includes the path of your DAG file by
default
:type template_searchpath: string or list of stings
正如@yannicksse 所建议的那样,将此实践应用到您的原始 dag 将如下所示:
dag = DAG(
dag_id='example_csv_to_redshift',
schedule_interval=None,
template_searchpath=[this_dag_path] # here
default_args=default_args
)
copy_s3_to_redshift = PostgresOperator(
task_id='load_table',
sql='copy_to_redshift.sql', # and here
params=dict(
AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
),
postgres_conn_id='postgres_redshift',
autocommit=False,
dag=dag
)
尽管就我个人而言,我会将所有模板放在一个子文件夹中
尝试将 Airflow 的模板功能(通过 Jinja2)与 PostgresOperator 结合使用时,我一直无法渲染。很可能我做错了什么,但我对问题可能是什么一无所知。这是重现我遇到的 TemplateNotFound 错误的示例:
airflow.cfg
airflow_home = /home/gregreda/airflow
dags_folder = /home/gregreda/airflow/dags
相关DAG和变量
default_args = {
'owner': 'gregreda',
'start_date': datetime(2016, 6, 1),
'schedule_interval': None,
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
this_dag_path = '/home/gregreda/airflow/dags/example_csv_to_redshift'
dag = DAG(
dag_id='example_csv_to_redshift',
schedule_interval=None,
default_args=default_args
)
/example_csv_to_redshift/csv_to_redshift.py
copy_s3_to_redshift = PostgresOperator(
task_id='load_table',
sql=this_dag_path + '/copy_to_redshift.sql',
params=dict(
AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
),
postgres_conn_id='postgres_redshift',
autocommit=False,
dag=dag
)
/example_csv_to_redshift/copy_to_redshift.sql
COPY public.table_foobar FROM 's3://mybucket/test-data/import/foobar.csv'
CREDENTIALS 'aws_access_key_id={{ AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;
调用airflow render example_csv_to_redshift load_table 2016-06-14
抛出下面的异常。请注意,对于另一个 DAG,我也 运行 关注此问题,这就是为什么您会看到提到 example_redshift_query_to_csv
的路径。
[2016-06-14 21:24:57,484] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-06-14 21:24:57,565] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2016-06-14 21:24:57,596] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2016-06-14 21:24:57,763] {models.py:154} INFO - Filling up the DagBag from /home/gregreda/airflow/dags
[2016-06-14 21:24:57,828] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files
setattr(self, attr, env.loader.get_source(env, content)[0])
File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
TemplateNotFound: /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql
[2016-06-14 21:24:57,834] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files
setattr(self, attr, env.loader.get_source(env, content)[0])
File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 15, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 359, in render
ti.render_templates()
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1409, in render_templates
rendered_content = rt(attr, content, jinja_context)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2017, in render_template
return jinja_env.get_template(content).render(**context)
File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 812, in get_template
return self._load_template(name, self.make_globals(globals))
File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 774, in _load_template
cache_key = self.loader.get_source(self, name)[1]
File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
非常感谢任何关于修复的想法。
标准 PEBCAK error.
在给定的 Airflow 任务中指定 SQL 模板的路径时出现问题,该路径需要是相对的。
copy_s3_to_redshift = PostgresOperator(
task_id='load_table',
sql='/copy_to_redshift.sql',
params=dict(
AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
),
postgres_conn_id='postgres_redshift',
autocommit=False,
dag=dag
)
此外,SQL 模板需要稍微更改一下(注意这次是 params. ...
):
COPY public.pitches FROM 's3://mybucket/test-data/import/heyward.csv'
CREDENTIALS 'aws_access_key_id={{ params.AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ params.AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;
要获得更多控制,请使用 template_searchpath
参数实例化您的 DAG,然后仅在运算符中使用文件名。
:param template_searchpath: This list of folders (non relative)
defines where jinja will look for your templates. Order matters.
Note that jinja/airflow includes the path of your DAG file by
default
:type template_searchpath: string or list of stings
正如@yannicksse 所建议的那样,将此实践应用到您的原始 dag 将如下所示:
dag = DAG(
dag_id='example_csv_to_redshift',
schedule_interval=None,
template_searchpath=[this_dag_path] # here
default_args=default_args
)
copy_s3_to_redshift = PostgresOperator(
task_id='load_table',
sql='copy_to_redshift.sql', # and here
params=dict(
AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
),
postgres_conn_id='postgres_redshift',
autocommit=False,
dag=dag
)
尽管就我个人而言,我会将所有模板放在一个子文件夹中