使用数组结构将 DataFrame 上传到 BigQuery

Uploading DataFrame to BigQuery with Array structure

我有一个包含 3 列的 pandas DataFrame:col1 包含列表,col2 包含字典,col3 包含 NaN:

dict_ = {'col1': [['abc'], ['def', 'ghi'], []],
         'col2': [{'k1': 'v1', 'k2': 'v2'},
                  {'k1': 'v3', 'k2': 'v4'},
                  {'k1': 'v5', 'k2': 'v6'}],
         'col3': [np.nan, np.nan, np.nan]}
df = pd.DataFrame(dict_)

将 DataFrame 上传到 BigQuery 我为第一列和第二列创建了以下架构:

schema = [
bigquery.SchemaField(name="col1", field_type="STRING", mode='REPEATED'),
bigquery.SchemaField(name="col2", field_type="RECORD", mode='NULLABLE',
                     fields=[bigquery.SchemaField(name="k1", field_type="STRING", mode='NULLABLE'),
                             bigquery.SchemaField(name="k2", field_type="STRING", mode='NULLABLE')])
]
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE", schema=schema)
job = client.load_table_from_dataframe(df, table, job_config=job_config)
job.result()

DataFrame 已上传,但 col1 为空。

Table 预览:

我应该怎么做才能解决这个问题?

BigQuery Python 客户端库中的 load_table_from_dataframe 方法将 DataFrame 序列化为 Parquet。不幸的是,BigQuery 后端对数组数据类型的支持有限。

作为解决方法,我推荐 insert_rows_from_dataframe 方法。

import pandas as pd
import numpy as np
from google.cloud import bigquery


dict_ = {'col1': [['abc'], ['def', 'ghi'], []],
         'col2': [{'k1': 'v1', 'k2': 'v2'},
                  {'k1': 'v3', 'k2': 'v4'},
                  {'k1': 'v5', 'k2': 'v6'}],
         'col3': [np.nan, np.nan, np.nan]}
df = pd.DataFrame(dict_)

client = bigquery.Client()

schema = [
    bigquery.SchemaField(name="col1", field_type="STRING", mode='REPEATED'),
    bigquery.SchemaField(name="col2", field_type="RECORD", mode='NULLABLE',
                     fields=[bigquery.SchemaField(name="k1", field_type="STRING", mode='NULLABLE'),
                             bigquery.SchemaField(name="k2", field_type="STRING", mode='NULLABLE')])
]
table = bigquery.Table(
    "my-project.my_dataset.Whosebug66054651",
    schema=schema
)
client.create_table(table)

errors = client.insert_rows_from_dataframe(table, df)
for chunk in errors:
    print(f"encountered {len(chunk)} errors: {chunk}")

loaded_df = client.query(
    # Use a query so that data is read from streaming buffer.
    "SELECT * FROM `my-project.my_dataset.Whosebug66054651`"
).to_dataframe()
print(loaded_df)

资源: