BigQueryOperator 运行 可以多个 SQL 文件吗?
Can BigQueryOperator run multiple SQL files?
我有一些 SQL 由 airflow BigQueryOperator 触发,SQL 之一对于以下情况下的所有任务都很常见:
SQL 到 运行 - common.sql
, abc.sql
, xyz.sql
- 任务 1 -
common.sql + abc.sql
- 任务 2 -
common.sql + xyz.sql
为了完成一项任务 运行 2 SQL,我将 2 SQL 个文件读入一个字符串,然后 运行 将合并后的字符串 运行 一口气完成任务
代码如下所示:
with open ('common.sql', "r") as sqlfile:
common_array = sqlfile.readlines()
with open ('abc.sql', "r") as sqlfile:
abc_array = sqlfile.readlines()
# at this point, sql_script has all codes from common.sql and abc.sql
sql_script = ''.join(common_array) + '\n' + ''.join(abc_array)
BigQueryOperator(task_id='task1', sql=sql_script)
这符合我的目的,还有其他更优雅的方法吗?
您可以创建一个包含文件的列表,而不是逐个文件地手动阅读。如果您有多个文件,这将很有帮助。对于每个文件读取内容,将其加入 common_array
并为每个加入的公共文件和 sql 文件创建一个键值对。
用于测试 test1.sql、test2.sql、test3.sql 和 common.sql 包含单行查询。
import json
with open ('common.sql', "r") as sqlfile:
common_array = sqlfile.readlines()
file_arr = ['test1.sql','test2.sql','test3.sql']
sql_dict = {}
for data in file_arr:
with open (data, "r") as sqlfile:
key = data.split('.')[0]
value = ''.join(common_array) + '\n' + ''.join(sqlfile.read().rstrip())
sql_dict[key] = value
print(json.dumps(sql_dict, sort_keys=False, indent=2))
print("\nDictionary value for sql_dict['test1']: \n"+ sql_dict['test1'])
输出:
{
"test1": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable` LIMIT 10",
"test2": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_2` LIMIT 10",
"test3": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_3` LIMIT 10"
}
Dictionary value for sql_dict['test1']:
SELECT * FROM `my-project.my_dataset.common` LIMIT 10
SELECT * FROM `my-project.my_dataset.myTable` LIMIT 10
然后您可以使用字典中的值 BigQueryOperator
。
BigQueryOperator(task_id='task1', sql=sql_dict['test1'])
我有一些 SQL 由 airflow BigQueryOperator 触发,SQL 之一对于以下情况下的所有任务都很常见:
SQL 到 运行 - common.sql
, abc.sql
, xyz.sql
- 任务 1 -
common.sql + abc.sql
- 任务 2 -
common.sql + xyz.sql
为了完成一项任务 运行 2 SQL,我将 2 SQL 个文件读入一个字符串,然后 运行 将合并后的字符串 运行 一口气完成任务
代码如下所示:
with open ('common.sql', "r") as sqlfile:
common_array = sqlfile.readlines()
with open ('abc.sql', "r") as sqlfile:
abc_array = sqlfile.readlines()
# at this point, sql_script has all codes from common.sql and abc.sql
sql_script = ''.join(common_array) + '\n' + ''.join(abc_array)
BigQueryOperator(task_id='task1', sql=sql_script)
这符合我的目的,还有其他更优雅的方法吗?
您可以创建一个包含文件的列表,而不是逐个文件地手动阅读。如果您有多个文件,这将很有帮助。对于每个文件读取内容,将其加入 common_array
并为每个加入的公共文件和 sql 文件创建一个键值对。
用于测试 test1.sql、test2.sql、test3.sql 和 common.sql 包含单行查询。
import json
with open ('common.sql', "r") as sqlfile:
common_array = sqlfile.readlines()
file_arr = ['test1.sql','test2.sql','test3.sql']
sql_dict = {}
for data in file_arr:
with open (data, "r") as sqlfile:
key = data.split('.')[0]
value = ''.join(common_array) + '\n' + ''.join(sqlfile.read().rstrip())
sql_dict[key] = value
print(json.dumps(sql_dict, sort_keys=False, indent=2))
print("\nDictionary value for sql_dict['test1']: \n"+ sql_dict['test1'])
输出:
{
"test1": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable` LIMIT 10",
"test2": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_2` LIMIT 10",
"test3": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_3` LIMIT 10"
}
Dictionary value for sql_dict['test1']:
SELECT * FROM `my-project.my_dataset.common` LIMIT 10
SELECT * FROM `my-project.my_dataset.myTable` LIMIT 10
然后您可以使用字典中的值 BigQueryOperator
。
BigQueryOperator(task_id='task1', sql=sql_dict['test1'])