将数据插入 BigQuery 时出错 table

Error with inserting data into BigQuery table

我尝试将 pandas 数据帧中的数据插入 GBQ table,但出现“无效数据错误”。 GBQ table 具有以下架构:

Field name Type Mode
id STRING REQUIRED
order_id STRING REQUIRED
action_date DATE NULLABLE
product_name STRING NULLABLE
order_sum FLOAT NULLABLE
website_id STRING NULLABLE
website_name STRING NULLABLE
webmaster_id STRING NULLABLE
webmaster STRING NULLABLE

而dataFrame有这样的结构:

id order_id action_date product_name order_sum website_id website_name webmaster_id webmaster_name
830339411 970561 2022-02-25 product_1 1000.0 123 site 1 456 webmaster 1
830339412 970562 2022-02-25 product_2 1500.0 120 site 2 456 webmaster 1

和数据类型:

column type
id object
order_id object
action_date object
product_name object
order_sum float64
website_id object
website_name object
webmaster_id object
webmaster object

最初 action_date 列的格式类似于 2022-02-25T20:31:02 但我已将其转换为 2022-02-25

all_orders['action_time'] = pd.to_datetime(df['action_time'])
all_orders['action_date'] = all_orders['action_time'].dt.date

所以当我尝试向 GBQ 中插入一些行时出现错误:

'errors': [{'reason': 'invalid', 'location': 'action_date', 'debugInfo': '', 'message': "Invalid date: '1644019200000'"}]

GBQ 似乎将 action_date 列中的日期视为 unix 时间戳。如何解决?

写入前不需要解析2022-02-25T20:31:02格式,使用下面的方式写入table.

dataframe['action_time'] = pandas.to_datetime(dataframe['action_time'], infer_datetime_format=True)

以下是将日期写入 BigQuery 的完整功能示例:

import datetime
from google.cloud import bigquery
import pandas
import pytz

client = bigquery.Client()

table_id = "<project>.<ds>.<table>"

records = [
    {
        "date": '2022-12-25T20:31:02',
        "data": "Final Teste",
    },
]
dataframe = pandas.DataFrame(
    records,
    columns=[
        "date",
        "data",
    ],
)


dataframe['date'] = pandas.to_datetime(dataframe['date'], infer_datetime_format=True)

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE),
        bigquery.SchemaField("data", bigquery.enums.SqlTypeNames.STRING),
    ],)

job = client.load_table_from_dataframe(dataframe, table_id) 
job.result()  

table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), table_id))

结果 table: