Template_searchpath 在 Airflow 中给出 TemplateNotFound 错误,找不到 SQL 脚本

Template_searchpath gives TemplateNotFound error in Airflow and cannot find the SQL script

我有一个这样描述的 DAG :

tmpl_search_path = '/home/airflow/gcs/sql_requests/'

with DAG(dag_id='pipeline', default_args=default_args, template_searchpath = [tmpl_search_path]) as dag:

    create_table = bigquery_operator.BigQueryOperator(
        task_id = 'create_table',
        sql = 'create_table.sql',
        use_legacy_sql = False,
        destination_dataset_table = some_table)
    )

任务 create_table 调用 SQL 脚本 create_table.sql。此 SQL 脚本与 DAG 文件夹不在同一文件夹中:它位于与 DAG 文件夹同一级别的 sql_requests 文件夹中。 这是 GCP Composer 存储桶内的架构(即 Google Airflow)是:

bucket_name
|- airflow.cfg
|- dags
   |_ pipeline.py
|- ...
|_ sql_requests
   |_ create_table.sql

我需要为 template_searchpath 设置什么路径以引用 GCP 上 Airflow 存储桶内的文件夹 sql_requests

我试过 template_searchpath= ['/home/airflow/gcs/sql_requests']template_searchpath= ['../sql_requests']template_searchpath= ['/sql_requests'],但其中 none 有效。

我得到的错误信息是'jinja2.exceptions.TemplateNotFound'

我相信默认情况下,操作员会在 DAG 文件夹中查找 sql 个文件,因此您可以将 SQL 放入文件夹

gs://composer-bucket-name/dags/sql/create_table.sql

然后引用为

sql = '/sql/create_table.sql'

如果这不起作用,请尝试不带前导 /(我不确定您是否需要)

编辑

如果您想将它们放在存储桶根目录的文件夹中,请尝试

sql = '../sql/create_table.sql'

根据 https://cloud.google.com/composer/docs/concepts/cloud-storage,无法将执行 dag 所需的文件存储在文件夹 dagsplugins 之外的其他位置:

To avoid a workflow failure, store your DAGs, plugins, and Python modules in the dags/ or plugins/ folders—even if your Python modules do not contain DAGs or plugins.

这就是我遇到 TemplateNotFound 错误的原因。

您可以在 mounted/known 路径中存储 dags/plugins 或数据

data 文件夹没有容量限制,但使用它来存储网络服务器需要读取的任何内容很容易让自己迷失方向,因为网络服务器无法访问该文件夹(例如,如果您将 SQL 文件在 /data 文件夹中,您将无法在 UI 中解析呈现的模板,但任何需要在 运行 期间访问该文件的任务都可以 运行 )

将“sql_requests”文件夹更改为“dag”文件夹,这样您的代码将如下所示:

tmpl_search_path = '/home/airflow/dags/sql_requests/'  
with DAG(dag_id='pipeline', default_args=default_args, template_searchpath = [tmpl_search_path]) as dag:      
    create_table = bigquery_operator.BigQueryOperator(
        task_id = 'create_table',
        sql = 'create_table.sql',
        use_legacy_sql = False,
        destination_dataset_table = some_table
    )
)

对我来说,它有效!