如何从 BigQuery 读取 Google 存储桶中的每个文件并为存储桶中的每个文件创建一个 table?
How to read every file in a Google Storage bucket from BigQuery and create one table for each file in the bucket?
我有一个包含 28 个文件的存储桶,我想将它们加载到不同的 table 中(存储桶中每个文件一个 table)。
存储桶中的每个文件如下所示:
file_1_2020.csv
file_2_2020.csv
..
file_28_2020.csv
我希望 table 具有文件名的前 6 个字符 + _table(它们都是带有 headers 的 csv)。例如,file_1_table
.
我在一篇博客中读到,我可以通过创建一个列出存储桶中所有文件的联合查询来实现这一点。然后我可以遍历所有名称并创建我需要的 tables。问题是我在 GCP 上阅读的所有文档都是关于云中的联合查询 SQL,所以我真的不知道如何让 BigQuery 读取我的存储桶。
我怎样才能做到这一点?它是否通过联合查询完成并不重要(但我想知道,因为了解和学习新事物总是好的)。另外,我想按日期时间对 tables 进行分区和聚类,这是每个文件的第一列(每个文件的模式是 datetime:TIMESTAMP,col1:FLOAT,col2:FLOAT
)
您可以使用 EXECUTE IMMEDIATE
来实现:
第 1 步:创建外部 table 以获取存储桶中的文件列表
CREATE EXTERNAL TABLE mydataset.mytable OPTIONS (format = 'CSV', uris = ['gs://bucket_name/*.csv']);
步骤 2: 使用 EXECUTE IMMEDIATE 动态创建 tables
DECLARE FILE_LIST ARRAY<STRING>;
DECLARE TABLE_NAME STRING;
DECLARE I INT64 DEFAULT 1;
DECLARE CNT INT64 DEFAULT 0;
SET FILE_LIST = ARRAY(SELECT DISTINCT _FILE_NAME as FILENAME FROM mydataset.mytable);
SET CNT = ARRAY_LENGTH(FILE_LIST);
WHILE I <= CNT
DO
SET TABLE_NAME = CONCAT(SUBSTR(REPLACE(SUBSTR(FILE_LIST[ORDINAL(i)], INSTR(FILE_LIST[ORDINAL(i)], '/', -1) + 1) ,'.csv', ''), 1, 6), '_table');
EXECUTE IMMEDIATE "CREATE EXTERNAL TABLE mydataset." || TABLE_NAME || " OPTIONS (format = 'CSV', uris = ['" || FILE_LIST[ORDINAL(I)] || "'])";
SET I = I + 1;
END WHILE;
对于此用例,您可以利用 Cloud Workflows 新的 Google 云产品。
简而言之,Workflows 允许您将服务连接在一起,任何具有 public API 的服务。集成任何 Google 云 API、SaaS API 或私有 API。
本质上,您需要将 Cloud Storage 与 BigQuery 连接才能加载文件。
您需要在工作流中定义以下步骤:
- 使用云存储 API 从您的 bucket/folder
中获取文件
- 从文件名中提取片段,例如:12 和 2020 到变量中供以后使用
- 使用 BigQuery Load API 发送在第一步中获得的输入 sourceURI,将步骤 2 中的段组合为您的目标名称格式
有一个完整的博客post我有关于这个:Using Cloud Workflows to load Cloud Storage files into BigQuery
我有一个包含 28 个文件的存储桶,我想将它们加载到不同的 table 中(存储桶中每个文件一个 table)。
存储桶中的每个文件如下所示:
file_1_2020.csv
file_2_2020.csv
..
file_28_2020.csv
我希望 table 具有文件名的前 6 个字符 + _table(它们都是带有 headers 的 csv)。例如,file_1_table
.
我在一篇博客中读到,我可以通过创建一个列出存储桶中所有文件的联合查询来实现这一点。然后我可以遍历所有名称并创建我需要的 tables。问题是我在 GCP 上阅读的所有文档都是关于云中的联合查询 SQL,所以我真的不知道如何让 BigQuery 读取我的存储桶。
我怎样才能做到这一点?它是否通过联合查询完成并不重要(但我想知道,因为了解和学习新事物总是好的)。另外,我想按日期时间对 tables 进行分区和聚类,这是每个文件的第一列(每个文件的模式是 datetime:TIMESTAMP,col1:FLOAT,col2:FLOAT
)
您可以使用 EXECUTE IMMEDIATE
来实现:
第 1 步:创建外部 table 以获取存储桶中的文件列表
CREATE EXTERNAL TABLE mydataset.mytable OPTIONS (format = 'CSV', uris = ['gs://bucket_name/*.csv']);
步骤 2: 使用 EXECUTE IMMEDIATE 动态创建 tables
DECLARE FILE_LIST ARRAY<STRING>;
DECLARE TABLE_NAME STRING;
DECLARE I INT64 DEFAULT 1;
DECLARE CNT INT64 DEFAULT 0;
SET FILE_LIST = ARRAY(SELECT DISTINCT _FILE_NAME as FILENAME FROM mydataset.mytable);
SET CNT = ARRAY_LENGTH(FILE_LIST);
WHILE I <= CNT
DO
SET TABLE_NAME = CONCAT(SUBSTR(REPLACE(SUBSTR(FILE_LIST[ORDINAL(i)], INSTR(FILE_LIST[ORDINAL(i)], '/', -1) + 1) ,'.csv', ''), 1, 6), '_table');
EXECUTE IMMEDIATE "CREATE EXTERNAL TABLE mydataset." || TABLE_NAME || " OPTIONS (format = 'CSV', uris = ['" || FILE_LIST[ORDINAL(I)] || "'])";
SET I = I + 1;
END WHILE;
对于此用例,您可以利用 Cloud Workflows 新的 Google 云产品。
简而言之,Workflows 允许您将服务连接在一起,任何具有 public API 的服务。集成任何 Google 云 API、SaaS API 或私有 API。
本质上,您需要将 Cloud Storage 与 BigQuery 连接才能加载文件。
您需要在工作流中定义以下步骤:
- 使用云存储 API 从您的 bucket/folder 中获取文件
- 从文件名中提取片段,例如:12 和 2020 到变量中供以后使用
- 使用 BigQuery Load API 发送在第一步中获得的输入 sourceURI,将步骤 2 中的段组合为您的目标名称格式
有一个完整的博客post我有关于这个:Using Cloud Workflows to load Cloud Storage files into BigQuery