从 GCS 获取文件并根据文件名模式加载到 bigquery
Get file from GCS and load to bigquery based on file name pattern
我们需要根据文件名模式将文件从 gcs 存储桶加载到不同的 bigquery table。
例如
bucket/folder/test_a_20221023.csv -> should go to table a
bucket/folder/file_a_20221023_2.csv -> should go to table a
bucket/folder/control_b_20221023.csv -> should go to table b
bucket/folder/test_b_20221023_2.csv -> should go to table b
因此前缀可以是文件名中 a 或 b 模式之前的任何内容,并且基于 a 或 b 我们必须加载到 bigquery 中的特定 table。除了 gsutil 命令
gsutil ls gs://[BUCKET_NAME]/** | grep string
我强烈建议你使用 BATCH DataFlow 来做这个,也许有必要学习如何使用 Apache-beam[GCP] 或者你可以尝试使用 DEFAULT Dataflow 来做这个,但我不要认为他会尊重名称文件
您可以考虑的一种方法是使用 Python 脚本。基本上,逻辑是:
- List the objects 在您的存储桶或文件夹中,并根据它们的类别或组对其进行排序。
- Create a loop and use the
load_table_from_uri
function 根据步骤 1 创建的列表加载数据。
以下是基于您的用例的示例代码:
from google.cloud import storage
from google.cloud import bigquery
import re
def list_blobs_with_prefix(bucket_name, prefix, delimiter=None):
storage_client = storage.Client()
# Note: Client.list_blobs requires at least package version 1.17.0.
blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter)
a_list = []
b_list = []
for blob in blobs:
is_a = re.search("\w+\/\w+_a_\d+\_?\d+?.csv", blob.name)
is_b = re.search("\w+\/\w+_b_\d+\_?\d+?.csv", blob.name)
if is_a:
a_list.append(f'gs://{bucket_name}/{blob.name}')
elif is_b:
b_list.append(f'gs://{bucket_name}/{blob.name}')
return a_list,b_list
def load_to_bq():
bucket_name="your-bucket"
prefix="folder/"
table_a = "PROJECT_ID.dataset.gcs_csv_a"
table_b = "PROJECT_ID.dataset.gcs_csv_b"
a_list,b_list = list_blobs_with_prefix(bucket_name, prefix,delimiter="/")
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
for csv_file_a in a_list:
load_job = client.load_table_from_uri(
csv_file_a, table_a, job_config=job_config
)
load_job.result()
for csv_file_b in b_list:
load_job = client.load_table_from_uri(
csv_file_b, table_b, job_config=job_config
)
load_job.result()
load_to_bq()
Folder structure: gs://bucket_name/folder/control_b_20221023.csv
我们需要根据文件名模式将文件从 gcs 存储桶加载到不同的 bigquery table。 例如
bucket/folder/test_a_20221023.csv -> should go to table a
bucket/folder/file_a_20221023_2.csv -> should go to table a
bucket/folder/control_b_20221023.csv -> should go to table b
bucket/folder/test_b_20221023_2.csv -> should go to table b
因此前缀可以是文件名中 a 或 b 模式之前的任何内容,并且基于 a 或 b 我们必须加载到 bigquery 中的特定 table。除了 gsutil 命令
gsutil ls gs://[BUCKET_NAME]/** | grep string
我强烈建议你使用 BATCH DataFlow 来做这个,也许有必要学习如何使用 Apache-beam[GCP] 或者你可以尝试使用 DEFAULT Dataflow 来做这个,但我不要认为他会尊重名称文件
您可以考虑的一种方法是使用 Python 脚本。基本上,逻辑是:
- List the objects 在您的存储桶或文件夹中,并根据它们的类别或组对其进行排序。
- Create a loop and use the
load_table_from_uri
function 根据步骤 1 创建的列表加载数据。
以下是基于您的用例的示例代码:
from google.cloud import storage
from google.cloud import bigquery
import re
def list_blobs_with_prefix(bucket_name, prefix, delimiter=None):
storage_client = storage.Client()
# Note: Client.list_blobs requires at least package version 1.17.0.
blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter)
a_list = []
b_list = []
for blob in blobs:
is_a = re.search("\w+\/\w+_a_\d+\_?\d+?.csv", blob.name)
is_b = re.search("\w+\/\w+_b_\d+\_?\d+?.csv", blob.name)
if is_a:
a_list.append(f'gs://{bucket_name}/{blob.name}')
elif is_b:
b_list.append(f'gs://{bucket_name}/{blob.name}')
return a_list,b_list
def load_to_bq():
bucket_name="your-bucket"
prefix="folder/"
table_a = "PROJECT_ID.dataset.gcs_csv_a"
table_b = "PROJECT_ID.dataset.gcs_csv_b"
a_list,b_list = list_blobs_with_prefix(bucket_name, prefix,delimiter="/")
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
for csv_file_a in a_list:
load_job = client.load_table_from_uri(
csv_file_a, table_a, job_config=job_config
)
load_job.result()
for csv_file_b in b_list:
load_job = client.load_table_from_uri(
csv_file_b, table_b, job_config=job_config
)
load_job.result()
load_to_bq()
Folder structure: gs://bucket_name/folder/control_b_20221023.csv