Airflow DAG 应用于数据集中的多个 BigQuery 表
Airflow DAG to apply on multiple BigQuery tables in the dataset
我有一个包含多个表的 BigQuery 数据集,我们称它们为基表或源表。一些外部应用程序正在将数据附加到这些基表,一些是周期性的,一些是偶尔的。我想要一个 Airflow DAG 查询这些 source_table
s 并根据一些普遍适用于他们都。 sql 文件有一个带有 source_table
.
占位符的固定查询
我的 DAG 看起来像这样:
projectId = os.environ["GCP_PROJECT"]
dataset = <target-dataset>
dag = DAG(...)
selectInsertOp = BigQueryOperator(
...
sql=<sed_the_source_table_placeholder('sql_file.sql')>,
...
destination_dataset_table=source_table + '_bulk'
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
dag=dag
)
selectInsertOp
由于 source_table
数量众多(数百个),我如何在不重复 DAG 文件(以及相应的 SQL 文件)的情况下实现这一点?我想要这个单个 DAG 文件创建多个 BigQueryOperator 任务。
如果您有必要的表列表,您可以像这样将运算符包装在 for 循环中。
for source_table in table_list:
selectInsertOp = BigQueryOperator(
...
sql=<sed_the_source_table_placeholder('sql_file.sql')>,
...
destination_dataset_table=source_table + '_bulk'
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
dag=dag
我有一个包含多个表的 BigQuery 数据集,我们称它们为基表或源表。一些外部应用程序正在将数据附加到这些基表,一些是周期性的,一些是偶尔的。我想要一个 Airflow DAG 查询这些 source_table
s 并根据一些普遍适用于他们都。 sql 文件有一个带有 source_table
.
我的 DAG 看起来像这样:
projectId = os.environ["GCP_PROJECT"]
dataset = <target-dataset>
dag = DAG(...)
selectInsertOp = BigQueryOperator(
...
sql=<sed_the_source_table_placeholder('sql_file.sql')>,
...
destination_dataset_table=source_table + '_bulk'
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
dag=dag
)
selectInsertOp
由于 source_table
数量众多(数百个),我如何在不重复 DAG 文件(以及相应的 SQL 文件)的情况下实现这一点?我想要这个单个 DAG 文件创建多个 BigQueryOperator 任务。
如果您有必要的表列表,您可以像这样将运算符包装在 for 循环中。
for source_table in table_list:
selectInsertOp = BigQueryOperator(
...
sql=<sed_the_source_table_placeholder('sql_file.sql')>,
...
destination_dataset_table=source_table + '_bulk'
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
dag=dag