BigQueryOperator 运行 可以多个 SQL 文件吗?

Can BigQueryOperator run multiple SQL files?

我有一些 SQL 由 airflow BigQueryOperator 触发,SQL 之一对于以下情况下的所有任务都很常见:

SQL 到 运行 - common.sql, abc.sql, xyz.sql

为了完成一项任务 运行 2 SQL,我将 2 SQL 个文件读入一个字符串,然后 运行 将合并后的字符串 运行 一口气完成任务

代码如下所示:

with open ('common.sql', "r") as sqlfile:
     common_array = sqlfile.readlines()

with open ('abc.sql', "r") as sqlfile:
     abc_array    = sqlfile.readlines()
            
# at this point, sql_script has all codes from common.sql and abc.sql
sql_script = ''.join(common_array) + '\n' + ''.join(abc_array)

BigQueryOperator(task_id='task1', sql=sql_script)

这符合我的目的,还有其他更优雅的方法吗?

您可以创建一个包含文件的列表,而不是逐个文件地手动阅读。如果您有多个文件,这将很有帮助。对于每个文件读取内容,将其加入 common_array 并为每个加入的公共文件和 sql 文件创建一个键值对。

用于测试 test1.sql、test2.sql、test3.sql 和 common.sql 包含单行查询。

import json
with open ('common.sql', "r") as sqlfile:
     common_array = sqlfile.readlines()

file_arr = ['test1.sql','test2.sql','test3.sql']
sql_dict = {}
for data in file_arr:
    with open (data, "r") as sqlfile:
        key = data.split('.')[0]
        value = ''.join(common_array) + '\n' + ''.join(sqlfile.read().rstrip())
        sql_dict[key] = value

print(json.dumps(sql_dict, sort_keys=False, indent=2))

print("\nDictionary value for sql_dict['test1']: \n"+ sql_dict['test1'])

输出:

{
  "test1": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable` LIMIT 10",
  "test2": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_2` LIMIT 10",
  "test3": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_3` LIMIT 10"
}

Dictionary value for sql_dict['test1']:
SELECT * FROM `my-project.my_dataset.common` LIMIT 10

SELECT * FROM `my-project.my_dataset.myTable` LIMIT 10

然后您可以使用字典中的值 BigQueryOperator

BigQueryOperator(task_id='task1', sql=sql_dict['test1'])