从 GCS 获取文件并根据文件名模式加载到 bigquery

Get file from GCS and load to bigquery based on file name pattern

我们需要根据文件名模式将文件从 gcs 存储桶加载到不同的 bigquery table。 例如

bucket/folder/test_a_20221023.csv  -> should go to table a
bucket/folder/file_a_20221023_2.csv  -> should go to table a
bucket/folder/control_b_20221023.csv  -> should go to table b
bucket/folder/test_b_20221023_2.csv  -> should go to table b

因此前缀可以是文件名中 a 或 b 模式之前的任何内容,并且基于 a 或 b 我们必须加载到 bigquery 中的特定 table。除了 gsutil 命令

gsutil ls gs://[BUCKET_NAME]/** | grep string

我强烈建议你使用 BATCH DataFlow 来做这个,也许有必要学习如何使用 Apache-beam[GCP] 或者你可以尝试使用 DEFAULT Dataflow 来做这个,但我不要认为他会尊重名称文件

您可以考虑的一种方法是使用 Python 脚本。基本上,逻辑是:

  1. List the objects 在您的存储桶或文件夹中,并根据它们的类别或组对其进行排序。
  2. Create a loop and use the load_table_from_uri function 根据步骤 1 创建的列表加载数据。

以下是基于您的用例的示例代码:

from google.cloud import storage
from google.cloud import bigquery
import re

def list_blobs_with_prefix(bucket_name, prefix, delimiter=None):

    storage_client = storage.Client()

    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter)

    a_list = []
    b_list = []
    for blob in blobs:
        is_a = re.search("\w+\/\w+_a_\d+\_?\d+?.csv", blob.name)
        is_b = re.search("\w+\/\w+_b_\d+\_?\d+?.csv", blob.name)
        if is_a:
            a_list.append(f'gs://{bucket_name}/{blob.name}')
        elif is_b:
            b_list.append(f'gs://{bucket_name}/{blob.name}')

    return a_list,b_list

def load_to_bq():
    bucket_name="your-bucket"
    prefix="folder/"

    table_a = "PROJECT_ID.dataset.gcs_csv_a"
    table_b = "PROJECT_ID.dataset.gcs_csv_b"

    a_list,b_list = list_blobs_with_prefix(bucket_name, prefix,delimiter="/")

    client = bigquery.Client()

    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
    )

    for csv_file_a in a_list:
        load_job = client.load_table_from_uri(
                csv_file_a, table_a, job_config=job_config
                )
        load_job.result()

    for csv_file_b in b_list:
        load_job = client.load_table_from_uri(
                csv_file_b, table_b, job_config=job_config
                )
        load_job.result()

load_to_bq()

Folder structure: gs://bucket_name/folder/control_b_20221023.csv