将多个文件从云存储加载到不同表中的大查询
Loading multiple file from cloud storage to big query in different tables
我是 GCP 的新手,我可以从我的 VM 中将 1 个文件导入 GCS,然后将其传输到 bigquery。
如何将多个文件从 GCS 传输到 Bigquery。我知道通配符 URi 是它的解决方案,但下面的代码还需要进行哪些其他更改?
def hello_gcs(event, context):
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "test_project.test_dataset.test_Table"
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://test_bucket/*.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print(f"Processing file: {file['name']}.")
因为可能有多个上传所以我不能定义具体的 table 名称或文件名?是否可以自动执行此任务?
每当 GCS 存储桶中有新文件时,PubSub 就会触发此函数。
谢谢
如果您的数据源是 GCS 而您的目的地是 BQ,您可以使用 BigQuery Data Transfer Service 将您的数据 ETL 到 BQ。每个传输作业都针对某个 Table,如果您想使用流模式在某个 Table 中追加或覆盖数据,您可以 select。
您也可以安排此作业。每日、每周等
如果我错了请纠正我,我知道你的云函数是由 finalize
事件 (Google Cloud Storage Triggers) 触发的,当一个新文件(或对象)出现在存储桶中时。这意味着桶中的每个“新”对象都有一个事件。因此,至少为每个对象调用一次云函数。
上面的 link 有一个来自 event
字典的数据示例。那里有大量信息,包括要加载的对象(文件)的详细信息。
例如,您可能希望在文件名模式和目标 BigQuery table 之间进行一些映射以进行数据加载。使用该地图,您将能够决定应该使用哪个 table 进行加载。或者您可能有一些其他机制来选择目标 table.
其他一些需要考虑的事情:
- 异常处理 - 如果
数据未加载(出于任何原因)?谁以及如何被告知?
要做什么(更正源数据或目标table
和)重复加载等
- 如果加载比云函数花费更多时间会发生什么
超时(目前最大 540 秒)?
- 如果有多个云函数会怎样
来自一个
finalize
事件的调用,或来自不同事件但
来自语义相同的源文件(重复数据、重复、
等等)
不要回答我,这种情况你还没做过就想想吧
要将多个文件从 GCS 传输到 Bigquery,您可以简单地遍历所有文件。下面是带有注释的工作代码示例。
我相信 event
和 context
(函数参数)默认由 Google 云函数处理,因此无需修改该部分。或者您可以通过利用 event
而不是循环来简化代码。
def hello_gcs(event, context):
import re
from google.cloud import storage
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
bq_client = bigquery.Client()
bucket = storage.Client().bucket("bucket-name")
for blob in bucket.list_blobs(prefix="folder-name/"):
if ".csv" in blob.name: #Checking for csv blobs as list_blobs also returns folder_name
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
csv_filename = re.findall(r".*/(.*).csv",blob.name) #Extracting file name for BQ's table id
bq_table_id = "project-name.dataset-name."+csv_filename[0] # Determining table name
try: #Check if the table already exists and skip uploading it.
bq_client.get_table(bq_table_id)
print("Table {} already exists. Not uploaded.".format(bq_table_id))
except NotFound: #If table is not found, upload it.
uri = "gs://bucket-name/"+blob.name
print(uri)
load_job = bq_client.load_table_from_uri(
uri, bq_table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = bq_client.get_table(bq_table_id) # Make an API request.
print("Table {} uploaded.".format(bq_table_id))
要在单个 Cloud Function 调用中将多个 GCS 文件加载到多个 BQ table,您需要列出这些文件,然后迭代它们,为每个文件创建一个加载作业,就像你已经完成了一个。但是在单个函数调用中完成所有这些工作,有点破坏了使用 Cloud Functions 的目的。
如果您的要求不强制您这样做,您可以利用 Cloud Functions 的强大功能,让每个文件在添加到存储桶后触发单个 CF,因为它是一个事件驱动函数.请参考https://cloud.google.com/functions/docs/writing/background#cloud-storage-example。每次有指定的 activity 时都会触发它,其中会有事件元数据。
因此,在您的应用程序中,我们可以获取触发事件的文件的名称,然后仅将该文件加载到 bigquery table 中,而不是获取 URI 中的整个存储桶内容,如下面的代码示例。
以下是解决代码中问题的方法。尝试在您的代码中进行以下更改。
您可以从云函数事件字典中提取有关事件的详细信息和有关触发事件的文件的详细信息。在您的情况下,我们可以将文件名作为事件 ['name'] 并更新“uri”变量。
生成一个新的唯一的table_id(这里以table_id与文件名相同为例)。您可以根据需要使用其他方案生成唯一的文件名。
参考下面的代码
def hello_gcs(event, context):
from google.cloud import bigquery
client = bigquery.Client() # Construct a BigQuery client object.
print(f"Processing file: {event['name']}.") #name of the file which triggers the function
if ".csv" in event['name']:
# bq job config
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
file_name = event['name'].split('.')
table_id = "<project_id>.<dataset_name>."+file_name[0] #[generating new id for each table]
uri = "gs://<bucket_name>/"+event['name']
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Table {} uploaded.".format(table_id))
我是 GCP 的新手,我可以从我的 VM 中将 1 个文件导入 GCS,然后将其传输到 bigquery。 如何将多个文件从 GCS 传输到 Bigquery。我知道通配符 URi 是它的解决方案,但下面的代码还需要进行哪些其他更改?
def hello_gcs(event, context):
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "test_project.test_dataset.test_Table"
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://test_bucket/*.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print(f"Processing file: {file['name']}.")
因为可能有多个上传所以我不能定义具体的 table 名称或文件名?是否可以自动执行此任务?
每当 GCS 存储桶中有新文件时,PubSub 就会触发此函数。 谢谢
如果您的数据源是 GCS 而您的目的地是 BQ,您可以使用 BigQuery Data Transfer Service 将您的数据 ETL 到 BQ。每个传输作业都针对某个 Table,如果您想使用流模式在某个 Table 中追加或覆盖数据,您可以 select。
您也可以安排此作业。每日、每周等
如果我错了请纠正我,我知道你的云函数是由 finalize
事件 (Google Cloud Storage Triggers) 触发的,当一个新文件(或对象)出现在存储桶中时。这意味着桶中的每个“新”对象都有一个事件。因此,至少为每个对象调用一次云函数。
上面的 link 有一个来自 event
字典的数据示例。那里有大量信息,包括要加载的对象(文件)的详细信息。
例如,您可能希望在文件名模式和目标 BigQuery table 之间进行一些映射以进行数据加载。使用该地图,您将能够决定应该使用哪个 table 进行加载。或者您可能有一些其他机制来选择目标 table.
其他一些需要考虑的事情:
- 异常处理 - 如果 数据未加载(出于任何原因)?谁以及如何被告知? 要做什么(更正源数据或目标table 和)重复加载等
- 如果加载比云函数花费更多时间会发生什么 超时(目前最大 540 秒)?
- 如果有多个云函数会怎样
来自一个
finalize
事件的调用,或来自不同事件但 来自语义相同的源文件(重复数据、重复、 等等)
不要回答我,这种情况你还没做过就想想吧
要将多个文件从 GCS 传输到 Bigquery,您可以简单地遍历所有文件。下面是带有注释的工作代码示例。
我相信 event
和 context
(函数参数)默认由 Google 云函数处理,因此无需修改该部分。或者您可以通过利用 event
而不是循环来简化代码。
def hello_gcs(event, context):
import re
from google.cloud import storage
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
bq_client = bigquery.Client()
bucket = storage.Client().bucket("bucket-name")
for blob in bucket.list_blobs(prefix="folder-name/"):
if ".csv" in blob.name: #Checking for csv blobs as list_blobs also returns folder_name
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
csv_filename = re.findall(r".*/(.*).csv",blob.name) #Extracting file name for BQ's table id
bq_table_id = "project-name.dataset-name."+csv_filename[0] # Determining table name
try: #Check if the table already exists and skip uploading it.
bq_client.get_table(bq_table_id)
print("Table {} already exists. Not uploaded.".format(bq_table_id))
except NotFound: #If table is not found, upload it.
uri = "gs://bucket-name/"+blob.name
print(uri)
load_job = bq_client.load_table_from_uri(
uri, bq_table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = bq_client.get_table(bq_table_id) # Make an API request.
print("Table {} uploaded.".format(bq_table_id))
要在单个 Cloud Function 调用中将多个 GCS 文件加载到多个 BQ table,您需要列出这些文件,然后迭代它们,为每个文件创建一个加载作业,就像你已经完成了一个。但是在单个函数调用中完成所有这些工作,有点破坏了使用 Cloud Functions 的目的。
如果您的要求不强制您这样做,您可以利用 Cloud Functions 的强大功能,让每个文件在添加到存储桶后触发单个 CF,因为它是一个事件驱动函数.请参考https://cloud.google.com/functions/docs/writing/background#cloud-storage-example。每次有指定的 activity 时都会触发它,其中会有事件元数据。
因此,在您的应用程序中,我们可以获取触发事件的文件的名称,然后仅将该文件加载到 bigquery table 中,而不是获取 URI 中的整个存储桶内容,如下面的代码示例。
以下是解决代码中问题的方法。尝试在您的代码中进行以下更改。
您可以从云函数事件字典中提取有关事件的详细信息和有关触发事件的文件的详细信息。在您的情况下,我们可以将文件名作为事件 ['name'] 并更新“uri”变量。
生成一个新的唯一的table_id(这里以table_id与文件名相同为例)。您可以根据需要使用其他方案生成唯一的文件名。
参考下面的代码
def hello_gcs(event, context):
from google.cloud import bigquery
client = bigquery.Client() # Construct a BigQuery client object.
print(f"Processing file: {event['name']}.") #name of the file which triggers the function
if ".csv" in event['name']:
# bq job config
job_config = bigquery.LoadJobConfig(
autodetect=True,
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
file_name = event['name'].split('.')
table_id = "<project_id>.<dataset_name>."+file_name[0] #[generating new id for each table]
uri = "gs://<bucket_name>/"+event['name']
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Table {} uploaded.".format(table_id))