将查询结果保存在 Cloud Storage 中的 BigQuery Table 中

Save the result of a query in a BigQuery Table, in Cloud Storage

我想知道将 Google BigQuery table 查询的结果存储到 Google 云存储的最佳方式是什么。我的代码目前在某些 Jupyter Notebook 中 运行(在 Vertex AI Workbench 中,与 BigQuery 数据源和云存储目标相同的项目)如下所示:

# CELL 1 OF 2

from google.cloud import bigquery
bqclient = bigquery.Client()

# The query string can vary:
query_string = """
        SELECT *  
        FROM `my_project-name.my_db.my_table` 
        LIMIT 2000000
        """

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(
        create_bqstorage_client=True,
    )
)
print("Dataframe shape: ", dataframe.shape)

# CELL 2 OF 2:

import pandas as pd
dataframe.to_csv('gs://my_bucket/test_file.csv', index=False)

此代码大约需要 7.5 分钟才能成功完成。

是否有更优化的方法来实现上面所做的事情?(这意味着 更快,但也许还有其他方法改进了)。

一些补充说明:

  1. 我想 运行“通过 Jupyter 笔记本”(在 Vertex AI Workbench 中),因为有时必须进行一些数据预处理或特殊过滤,这不能通过SQL 个查询。
  2. 对于代码的第一部分,我放弃了 pandas.read_gbq,因为当(实验性地)“存储为 .CSV 并回读”时,它给我一些奇怪的 EOF 错误。
  3. 直觉上,我会将优化工作集中在代码的后半部分 (CELL 2 OF 2),因为第一个部分是从 the official Google documentation. I have tried but it does not work, however in the same thread 选项中借用的,工作正常。
  4. 很可能此代码之后会包含在某些 Docker 图像中,因此必须使用“尽可能少的库”。

谢谢。

在这个 link 中,您将找到完成此任务的方法: https://cloud.google.com/bigquery/docs/samples/bigquery-extract-table?hl=en

不过,有几点需要注意。

  • 这只是一个摘录,但如果您要进行一些转换,您可以使用 Dataflow 或 Composer,最后一个允许您使用 SQL 使用 BQ 作业的转换来调整您的内容想。另一方面,Dataflow 使用 python 来处理代码并创建作业。

  • 此外,您可能还需要关注您的 bq table 性能、分区和集群细节 https://cloud.google.com/bigquery/docs/clustered-tables ;我还注意到您正在使用 limit 语句,这不会对您的性能产​​生影响,因为此 BQ 是柱状的,因此您仍在扫描所有列和所有数据。

片段:

# from google.cloud import bigquery
# client = bigquery.Client()
# bucket_name = 'my-bucket'
project = "bigquery-public-data"
dataset_id = "samples"
table_id = "shakespeare"

destination_uri = "gs://{}/{}".format(bucket_name, "shakespeare.csv")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_id)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    # Location must match that of the source table.
    location="US",
)  # API request
extract_job.result()  # Waits for job to complete.

print(
    "Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
)

希望对您有所帮助:)

经过一些实验,我想我已经找到了我原来的 post 的解决方案。首先,更新代码:

import pandas as pd  # Just one library is imported this time

# This SQL query can vary, modify it to match your needs
query_string = """
SELECT *
FROM `my_project.my_db.my_table`
LIMIT 2000000
"""

# One liner to query BigQuery data.
downloaded_dataframe = pd.read_gbq(query_string, dialect='standard', use_bqstorage_api=True)

# Data processing (OPTIONAL, modify it to match your needs)
# I won't do anything this time, just upload the previously queried data

# Data store in GCS
downloaded_dataframe.to_csv('gs://my_bucket/uploaded_data.csv', index=False)

一些最后的说明:

  1. 我还没有对处理速度与 BigQuery table 中存在的行数进行“in-depth 研究”,但是我看到更新代码的处理时间和原始查询,现在大约需要 6 分钟;暂时就够了。 因此,这个答案可能还有进一步改进的空间,但比原来的情况要好。
  2. 我在原文 post 中提到的 EOF 错误是:ParserError: Error tokenizing data. C error: EOF inside string starting at row 70198。最后我意识到它与 pandas_gbq 函数没有任何关系,而是与“我如何保存数据”有关。看,我 'experimentally' 将 .csv 文件存储在 Vertex AI Workbench 本地存储中,然后将其下载到我的本地设备,然后尝试从我的本地设备打开该数据,我一直绊倒那个错误,但是从 Cloud Storage 下载 .csv 数据时却没有得到相同的结果......为什么?好吧,如果您在“生成”后(即几秒钟后)“非常快”地从 Vertex AI Workbench 本地存储下载 .csv 数据,那么数据仍然不完整,但它不会给出任何错误或警告消息:它只会“让您开始下载”。出于这个原因,我认为将数据导出到 Cloud Storage,然后从那里安全下载会更安全。这种行为在大文件上更为明显(即我自己生成的文件,大小约为 3.1GB)。

希望对您有所帮助。

谢谢。

使用EXPORT DATA语句:


EXPORT DATA OPTIONS(
  uri='gs://bucket/folder/*.csv',
  format='CSV',
  overwrite=true,
  header=true,
  field_delimiter=';') AS
SELECT *
FROM `my_project.my_db.my_table`
LIMIT 2000000

  • URI 中的 * 允许将一个 table 导出到多个 table。这仅在导出的 table 大于 1GB (See here)
  • 时才有意义

直接在 BQ 控制台中通过命令行或在您的 python 脚本中执行。

from google.cloud import bigquery

client = bigquery.Client()

query_job = client.query(
    """
    EXPORT DATA OPTIONS(
      uri='gs://bucket/folder/*.csv',
      format='CSV',
      overwrite=true,
      header=true,
      field_delimiter=';') AS
    SELECT *
    FROM `my_project.my_db.my_table`
    LIMIT 2000000
  """
)

results = query_job.result()  # Waits for job to complete.

关于 LIMIT 的注释:请注意 LIMIT 不会减少 non-clustered table 中读取的数据量。只有返回的内容。 (See here)