Airflow DAG 应用于数据集中的多个 BigQuery 表

Airflow DAG to apply on multiple BigQuery tables in the dataset

我有一个包含多个表的 BigQuery 数据集,我们称它们为基表或源表。一些外部应用程序正在将数据附加到这些基表,一些是周期性的,一些是偶尔的。我想要一个 Airflow DAG 查询这些 source_tables 并根据一些普遍适用于他们都。 sql 文件有一个带有 source_table.

占位符的固定查询

我的 DAG 看起来像这样:

projectId = os.environ["GCP_PROJECT"]
dataset = <target-dataset>

dag = DAG(...)

selectInsertOp = BigQueryOperator(
    ...
    sql=<sed_the_source_table_placeholder('sql_file.sql')>,
    ...
    destination_dataset_table=source_table + '_bulk'
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    dag=dag
)

selectInsertOp

由于 source_table 数量众多(数百个),我如何在不重复 DAG 文件(以及相应的 SQL 文件)的情况下实现这一点?我想要这个单个 DAG 文件创建多个 BigQueryOperator 任务。

如果您有必要的表列表,您可以像这样将运算符包装在 for 循环中。

for source_table in table_list:
    selectInsertOp = BigQueryOperator(
    ...
    sql=<sed_the_source_table_placeholder('sql_file.sql')>,
    ...
    destination_dataset_table=source_table + '_bulk'
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    dag=dag