在 Google Cloud Composer 中使用 Airflow 模板文件和 template_searchpath
Using Airflow template files and template_searchpath in Google Cloud Composer
我在 Google Cloud Composer 上的 Airflow DAG 中广泛使用 BigQueryOperator
。
对于较长的查询,最好将每个查询放在它自己的 .sql
文件中,而不是用它弄乱 DAG。 Airflow 似乎支持所有 SQL 查询运算符,包括 BigQueryOperator,如您在 the documentation.
中所见
我的问题:在 .sql
模板文件中编写了 sql 语句后,如何将其添加到 Google Cloud Composer 并在 DAG 中引用它?
谷歌搜索并找到 。我找到了一种方法来完成这项工作(尽管它不是理想的解决方案,正如我们将要看到的那样)。这是一个包含三部分的工作示例:
- 带有一点神社模板的 sql 模板文件,
- DAG,以及
gcloud
命令需要将模板上传到正确的位置。
(1) sql 模板文件
这只是一个文本文件,其文件名以 .sql
扩展名结尾。假设此文件名为 my-templated-query.sql
并包含:
SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')
(2) 引用 DAG 文件中的模板
要引用此模板,请创建如下运算符:
count_task = BigQueryOperator(
task_id='count_rows',
sql='/my-templated-query.sql')
(3) 将模板文件添加到 Google Cloud Composer
事实证明,默认情况下,airflow 在 dags 文件夹中查找模板文件。要将我们的模板文件上传到 dags 文件夹,我们 运行
gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql
您必须相应地替换环境名称、位置和源路径。
将这些模板全部上传到dag 文件夹似乎不太合适。更好的 Airflow 做法是将模板放在它们自己的文件夹中,并将 template_searchpath
参数指定为 。但是,我不确定如何使用 Google Cloud Composer 执行此操作。
更新: 我意识到可以在 DAG 文件夹中放置子文件夹,这对于组织大量 SQL 模板很有用。假设我在 BigQueryOperator 中的 DAG_FOLDER/dataset1/table1.sql
中放置了一个 SQL 模板文件,然后我可以使用 sql=/dataset1/table1.sql
引用它。如果你有一个包含很多文件的子文件夹和很多其他子文件夹,你也可以使用我上面显示的 dag import
递归上传整个子文件夹——只需将它指向子文件夹。
我们最近使用类似的策略解决了这个问题。步骤是:
- 将所有 SQL 文件放入 Google Cloud Source Repository
- 在每个 DAG 的开头 运行,将文件克隆到自动与您的 Airflow 环境共享的 Cloud Storage Bucket 中的“data”目录中。
- 使用
BigQueryOperator
中的模板在执行时读取查询。
这是一个最小的解决方案:
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator
with models.DAG(
'bigquery_dag',
schedule_interval = None ,
template_searchpath = ['/home/airflow/gcs/data/repo/queries/'],
default_args = default_dag_args
) as dag:
t1_clean_repo = bash_operator.BashOperator(
task_id = 'clean_repo',
bash_command = 'rm -rf /home/airflow/gcs/data/repo'
)
clone_command = """
gcloud source repos clone repo --project=project_id
cp -R repo /home/airflow/gcs/data
"""
t2_clone_repo = bash_operator.BashOperator(
task_id='clone_repo',
bash_command=clone_command
)
t3_query = bigquery_operator.BigQueryOperator(
task_id='query',
sql= 'query.sql',
use_legacy_sql = False,
bigquery_conn_id='conn_id'
)
我们在这里利用了一些重要的概念:
- Cloud Storage Bucket 中的数据目录通过 Fuse 自动与您的 Airflow 实例共享。大多数操作员都可以访问此处放置的任何内容。
- 只要您的 Google Cloud Source 存储库与 Cloud Composer 在同一个项目中,您的 Airflow 实例就不需要
git clone
文件的额外权限。
- 我们正在 DAG 参数中设置
template_searchpath
,扩展搜索范围以包括云存储桶中的 data
目录。
我找到了解决这个问题的理想方法。在您的 dag 声明中,您可以设置 template_searchpath
这是 Airflow 查找 jinja 模板文件的默认路径。
为了在您的 Cloud Composer 实例中进行此操作,您必须按以下方式进行设置
dag = DAG(
...
template_searchpath=["/home/airflow/gcs/plugins"],
)
请注意,我在此示例中使用了插件文件夹。您可以改用您的数据文件夹或您希望存储桶中的任何文件夹。
我在 Google Cloud Composer 上的 Airflow DAG 中广泛使用 BigQueryOperator
。
对于较长的查询,最好将每个查询放在它自己的 .sql
文件中,而不是用它弄乱 DAG。 Airflow 似乎支持所有 SQL 查询运算符,包括 BigQueryOperator,如您在 the documentation.
我的问题:在 .sql
模板文件中编写了 sql 语句后,如何将其添加到 Google Cloud Composer 并在 DAG 中引用它?
谷歌搜索并找到
- 带有一点神社模板的 sql 模板文件,
- DAG,以及
gcloud
命令需要将模板上传到正确的位置。
(1) sql 模板文件
这只是一个文本文件,其文件名以 .sql
扩展名结尾。假设此文件名为 my-templated-query.sql
并包含:
SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')
(2) 引用 DAG 文件中的模板 要引用此模板,请创建如下运算符:
count_task = BigQueryOperator(
task_id='count_rows',
sql='/my-templated-query.sql')
(3) 将模板文件添加到 Google Cloud Composer 事实证明,默认情况下,airflow 在 dags 文件夹中查找模板文件。要将我们的模板文件上传到 dags 文件夹,我们 运行
gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql
您必须相应地替换环境名称、位置和源路径。
将这些模板全部上传到dag 文件夹似乎不太合适。更好的 Airflow 做法是将模板放在它们自己的文件夹中,并将 template_searchpath
参数指定为
更新: 我意识到可以在 DAG 文件夹中放置子文件夹,这对于组织大量 SQL 模板很有用。假设我在 BigQueryOperator 中的 DAG_FOLDER/dataset1/table1.sql
中放置了一个 SQL 模板文件,然后我可以使用 sql=/dataset1/table1.sql
引用它。如果你有一个包含很多文件的子文件夹和很多其他子文件夹,你也可以使用我上面显示的 dag import
递归上传整个子文件夹——只需将它指向子文件夹。
我们最近使用类似的策略解决了这个问题。步骤是:
- 将所有 SQL 文件放入 Google Cloud Source Repository
- 在每个 DAG 的开头 运行,将文件克隆到自动与您的 Airflow 环境共享的 Cloud Storage Bucket 中的“data”目录中。
- 使用
BigQueryOperator
中的模板在执行时读取查询。
这是一个最小的解决方案:
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator
with models.DAG(
'bigquery_dag',
schedule_interval = None ,
template_searchpath = ['/home/airflow/gcs/data/repo/queries/'],
default_args = default_dag_args
) as dag:
t1_clean_repo = bash_operator.BashOperator(
task_id = 'clean_repo',
bash_command = 'rm -rf /home/airflow/gcs/data/repo'
)
clone_command = """
gcloud source repos clone repo --project=project_id
cp -R repo /home/airflow/gcs/data
"""
t2_clone_repo = bash_operator.BashOperator(
task_id='clone_repo',
bash_command=clone_command
)
t3_query = bigquery_operator.BigQueryOperator(
task_id='query',
sql= 'query.sql',
use_legacy_sql = False,
bigquery_conn_id='conn_id'
)
我们在这里利用了一些重要的概念:
- Cloud Storage Bucket 中的数据目录通过 Fuse 自动与您的 Airflow 实例共享。大多数操作员都可以访问此处放置的任何内容。
- 只要您的 Google Cloud Source 存储库与 Cloud Composer 在同一个项目中,您的 Airflow 实例就不需要
git clone
文件的额外权限。 - 我们正在 DAG 参数中设置
template_searchpath
,扩展搜索范围以包括云存储桶中的data
目录。
我找到了解决这个问题的理想方法。在您的 dag 声明中,您可以设置 template_searchpath
这是 Airflow 查找 jinja 模板文件的默认路径。
为了在您的 Cloud Composer 实例中进行此操作,您必须按以下方式进行设置
dag = DAG(
...
template_searchpath=["/home/airflow/gcs/plugins"],
)
请注意,我在此示例中使用了插件文件夹。您可以改用您的数据文件夹或您希望存储桶中的任何文件夹。