将 Pandas DataFrame 写入 Google Cloud Storage 或 BigQuery
Write a Pandas DataFrame to Google Cloud Storage or BigQuery
您好,感谢您的时间和考虑。
我正在 Google Cloud Platform / Datalab 中开发 Jupyter Notebook。
我创建了一个 Pandas DataFrame 并想将此 DataFrame 写入 Google Cloud Storage(GCS) and/or BigQuery。我在 GCS 中有一个存储桶,并通过以下代码创建了以下对象:
import gcp
import gcp.storage as storage
project = gcp.Context.default().project_id
bucket_name = 'steve-temp'
bucket_path = bucket_name
bucket = storage.Bucket(bucket_path)
bucket.exists()
我根据 Google Datalab 文档尝试了各种方法,但仍然失败。
谢谢
我认为你需要将它加载到一个普通的字节变量中,并在一个单独的单元格中使用 %%storage write --variable $sample_bucketpath(参见文档)...我还在想出来...但这与我读取 CSV 文件所需要做的大致相反,我不知道它是否对写入有影响,但我不得不使用 BytesIO 来读取由 % 创建的缓冲区% storage read 命令...希望对您有所帮助,让我知道!
尝试以下工作示例:
from datalab.context import Context
import google.datalab.storage as storage
import google.datalab.bigquery as bq
import pandas as pd
# Dataframe to write
simple_dataframe = pd.DataFrame(data=[{1,2,3},{4,5,6}],columns=['a','b','c'])
sample_bucket_name = Context.default().project_id + '-datalab-example'
sample_bucket_path = 'gs://' + sample_bucket_name
sample_bucket_object = sample_bucket_path + '/Hello.txt'
bigquery_dataset_name = 'TestDataSet'
bigquery_table_name = 'TestTable'
# Define storage bucket
sample_bucket = storage.Bucket(sample_bucket_name)
# Create storage bucket if it does not exist
if not sample_bucket.exists():
sample_bucket.create()
# Define BigQuery dataset and table
dataset = bq.Dataset(bigquery_dataset_name)
table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)
# Create BigQuery dataset
if not dataset.exists():
dataset.create()
# Create or overwrite the existing table if it exists
table_schema = bq.Schema.from_data(simple_dataframe)
table.create(schema = table_schema, overwrite = True)
# Write the DataFrame to GCS (Google Cloud Storage)
%storage write --variable simple_dataframe --object $sample_bucket_object
# Write the DataFrame to a BigQuery table
table.insert(simple_dataframe)
我用了this example, and the _table.py file from the datalab github site as a reference. You can find other datalab
source code files at thislink.
使用 Google Cloud Datalab documentation
import datalab.storage as gcs
gcs.Bucket('bucket-name').item('to/data.csv').write_to(simple_dataframe.to_csv(),'text/csv')
正在将 Pandas DataFrame 写入 BigQuery
更新@Anthonios Partheniou 的回答。
现在的代码有点不同 - 11 月。 29 2017
定义 BigQuery 数据集
将包含 project_id
和 dataset_id
的元组传递给 bq.Dataset
。
# define a BigQuery dataset
bigquery_dataset_name = ('project_id', 'dataset_id')
dataset = bq.Dataset(name = bigquery_dataset_name)
定义 BigQuery table
将包含 project_id
、dataset_id
和 table 名称的元组传递给 bq.Table
。
# define a BigQuery table
bigquery_table_name = ('project_id', 'dataset_id', 'table_name')
table = bq.Table(bigquery_table_name)
创建数据集/table并写入 BQtable
# Create BigQuery dataset
if not dataset.exists():
dataset.create()
# Create or overwrite the existing table if it exists
table_schema = bq.Schema.from_data(dataFrame_name)
table.create(schema = table_schema, overwrite = True)
# Write the DataFrame to a BigQuery table
table.insert(dataFrame_name)
对于使用 Dask 的任务,我有一个更简单的解决方案。您可以将您的 DataFrame 转换为 Dask DataFrame,它可以写入 Cloud Storage 上的 csv
import dask.dataframe as dd
import pandas
df # your Pandas DataFrame
ddf = dd.from_pandas(df,npartitions=1, sort=True)
dd.to_csv('gs://YOUR_BUCKET/ddf-*.csv', index=False, sep=',', header=False,
storage_options={'token': gcs.session.credentials})
自 2017 年以来,Pandas 具有 Dataframe to BigQuery 函数 pandas.DataFrame.to_gbq
documentation有个例子:
import pandas_gbq as gbq
gbq.to_gbq(df, 'my_dataset.my_table', projectid, if_exists='fail')
参数if_exists
可以设置为'fail'、'replace'或'append'
另请参阅此 。
上传到 Google 云存储而不写入临时文件,仅使用标准 GCS 模块
from google.cloud import storage
import os
import pandas as pd
# Only need this if you're running this code locally.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'/your_GCP_creds/credentials.json'
df = pd.DataFrame(data=[{1,2,3},{4,5,6}],columns=['a','b','c'])
client = storage.Client()
bucket = client.get_bucket('my-bucket-name')
bucket.blob('upload_test/test.csv').upload_from_string(df.to_csv(), 'text/csv')
我花了很多时间找到解决这个问题的最简单方法:
import pandas as pd
df = pd.DataFrame(...)
df.to_csv('gs://bucket/path')
Google storage
:
def write_df_to_gs(df, gs_key):
df.to_csv(gs_key)
BigQuery
:
def upload_df_to_bq(df, project, bq_table):
df.to_gbq(bq_table, project_id=project, if_exists='replace')
要在 GCS 中保存 parquet 文件,并根据服务帐户进行身份验证:
df.to_parquet("gs://<bucket-name>/file.parquet",
storage_options={"token": <path-to-gcs-service-account-file>}
您好,感谢您的时间和考虑。 我正在 Google Cloud Platform / Datalab 中开发 Jupyter Notebook。 我创建了一个 Pandas DataFrame 并想将此 DataFrame 写入 Google Cloud Storage(GCS) and/or BigQuery。我在 GCS 中有一个存储桶,并通过以下代码创建了以下对象:
import gcp
import gcp.storage as storage
project = gcp.Context.default().project_id
bucket_name = 'steve-temp'
bucket_path = bucket_name
bucket = storage.Bucket(bucket_path)
bucket.exists()
我根据 Google Datalab 文档尝试了各种方法,但仍然失败。 谢谢
我认为你需要将它加载到一个普通的字节变量中,并在一个单独的单元格中使用 %%storage write --variable $sample_bucketpath(参见文档)...我还在想出来...但这与我读取 CSV 文件所需要做的大致相反,我不知道它是否对写入有影响,但我不得不使用 BytesIO 来读取由 % 创建的缓冲区% storage read 命令...希望对您有所帮助,让我知道!
尝试以下工作示例:
from datalab.context import Context
import google.datalab.storage as storage
import google.datalab.bigquery as bq
import pandas as pd
# Dataframe to write
simple_dataframe = pd.DataFrame(data=[{1,2,3},{4,5,6}],columns=['a','b','c'])
sample_bucket_name = Context.default().project_id + '-datalab-example'
sample_bucket_path = 'gs://' + sample_bucket_name
sample_bucket_object = sample_bucket_path + '/Hello.txt'
bigquery_dataset_name = 'TestDataSet'
bigquery_table_name = 'TestTable'
# Define storage bucket
sample_bucket = storage.Bucket(sample_bucket_name)
# Create storage bucket if it does not exist
if not sample_bucket.exists():
sample_bucket.create()
# Define BigQuery dataset and table
dataset = bq.Dataset(bigquery_dataset_name)
table = bq.Table(bigquery_dataset_name + '.' + bigquery_table_name)
# Create BigQuery dataset
if not dataset.exists():
dataset.create()
# Create or overwrite the existing table if it exists
table_schema = bq.Schema.from_data(simple_dataframe)
table.create(schema = table_schema, overwrite = True)
# Write the DataFrame to GCS (Google Cloud Storage)
%storage write --variable simple_dataframe --object $sample_bucket_object
# Write the DataFrame to a BigQuery table
table.insert(simple_dataframe)
我用了this example, and the _table.py file from the datalab github site as a reference. You can find other datalab
source code files at thislink.
使用 Google Cloud Datalab documentation
import datalab.storage as gcs
gcs.Bucket('bucket-name').item('to/data.csv').write_to(simple_dataframe.to_csv(),'text/csv')
正在将 Pandas DataFrame 写入 BigQuery
更新@Anthonios Partheniou 的回答。
现在的代码有点不同 - 11 月。 29 2017
定义 BigQuery 数据集
将包含 project_id
和 dataset_id
的元组传递给 bq.Dataset
。
# define a BigQuery dataset
bigquery_dataset_name = ('project_id', 'dataset_id')
dataset = bq.Dataset(name = bigquery_dataset_name)
定义 BigQuery table
将包含 project_id
、dataset_id
和 table 名称的元组传递给 bq.Table
。
# define a BigQuery table
bigquery_table_name = ('project_id', 'dataset_id', 'table_name')
table = bq.Table(bigquery_table_name)
创建数据集/table并写入 BQtable
# Create BigQuery dataset
if not dataset.exists():
dataset.create()
# Create or overwrite the existing table if it exists
table_schema = bq.Schema.from_data(dataFrame_name)
table.create(schema = table_schema, overwrite = True)
# Write the DataFrame to a BigQuery table
table.insert(dataFrame_name)
对于使用 Dask 的任务,我有一个更简单的解决方案。您可以将您的 DataFrame 转换为 Dask DataFrame,它可以写入 Cloud Storage 上的 csv
import dask.dataframe as dd
import pandas
df # your Pandas DataFrame
ddf = dd.from_pandas(df,npartitions=1, sort=True)
dd.to_csv('gs://YOUR_BUCKET/ddf-*.csv', index=False, sep=',', header=False,
storage_options={'token': gcs.session.credentials})
自 2017 年以来,Pandas 具有 Dataframe to BigQuery 函数 pandas.DataFrame.to_gbq
documentation有个例子:
import pandas_gbq as gbq
gbq.to_gbq(df, 'my_dataset.my_table', projectid, if_exists='fail')
参数if_exists
可以设置为'fail'、'replace'或'append'
另请参阅此
上传到 Google 云存储而不写入临时文件,仅使用标准 GCS 模块
from google.cloud import storage
import os
import pandas as pd
# Only need this if you're running this code locally.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'/your_GCP_creds/credentials.json'
df = pd.DataFrame(data=[{1,2,3},{4,5,6}],columns=['a','b','c'])
client = storage.Client()
bucket = client.get_bucket('my-bucket-name')
bucket.blob('upload_test/test.csv').upload_from_string(df.to_csv(), 'text/csv')
我花了很多时间找到解决这个问题的最简单方法:
import pandas as pd
df = pd.DataFrame(...)
df.to_csv('gs://bucket/path')
Google storage
:
def write_df_to_gs(df, gs_key):
df.to_csv(gs_key)
BigQuery
:
def upload_df_to_bq(df, project, bq_table):
df.to_gbq(bq_table, project_id=project, if_exists='replace')
要在 GCS 中保存 parquet 文件,并根据服务帐户进行身份验证:
df.to_parquet("gs://<bucket-name>/file.parquet",
storage_options={"token": <path-to-gcs-service-account-file>}