如何转换 Pandas 数据框模式
How to Convert Pandas Data Frame Schema
我正在使用 pandas.read_csv
读取 CSV 文件,它会自动检测类似于
的架构
Column1: string
Column2: string
Column3: string
Column4: int64
Column5: double
Column6: double
__index_level_0__: int64
然后,我尝试用 pyarrow.parquet.write_table
将其编写为 Parquet table。但是,我想对新的镶木地板文件使用以下架构
Column1: string
Column2: string
Column3: string
Column4: string
Column5: string
Column6: string
__index_level_0__: int64
但是我收到一条错误消息 "Table schema does not match schema used to create file"。这是我用来将 CSV 文件转换为 Parquet 文件的代码 borrowed from here
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000
csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")
for i, chunk in enumerate(csv_stream):
print("Chunk", i)
if i == 0:
# Guess the schema of the CSV file from the first chunk
# parquet_schema = pa.Table.from_pandas(df=chunk).schema
parquet_schema = pa.schema([
('c1', pa.string()),
('c2', pa.string()),
('c3', pa.string()),
('c4', pa.string()),
('c5', pa.string()),
('c6', pa.string())
])
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
# Write CSV chunk to the parquet file
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
df=df.astype(str)
将使用内置的 astype() 方法
使用 object
dtypes 将 pandas 数据帧中的所有数据转换为字符串
您还可以更改单个列的类型,例如 df['Column4'] = df['Column4'].astype(str)
。
您需要做的就是在 parquet_writer.write_table(table)
之前更改数据框的类型或其列的子集。总之,您的代码将如下所示。
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000
def convert(df):
df['Column4'] = df['Column4'].astype(str)
return df
csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")
for i, chunk in enumerate(csv_stream):
print("Chunk", i)
if i == 0:
converted = convert(chunk)
parquet_schema = pa.Table.from_pandas(df=converted).schema
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
# Write CSV chunk to the parquet file
converted = convert(chunk)
table = pa.Table.from_pandas(converted, parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
我正在使用 pandas.read_csv
读取 CSV 文件,它会自动检测类似于
Column1: string
Column2: string
Column3: string
Column4: int64
Column5: double
Column6: double
__index_level_0__: int64
然后,我尝试用 pyarrow.parquet.write_table
将其编写为 Parquet table。但是,我想对新的镶木地板文件使用以下架构
Column1: string
Column2: string
Column3: string
Column4: string
Column5: string
Column6: string
__index_level_0__: int64
但是我收到一条错误消息 "Table schema does not match schema used to create file"。这是我用来将 CSV 文件转换为 Parquet 文件的代码 borrowed from here
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000
csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")
for i, chunk in enumerate(csv_stream):
print("Chunk", i)
if i == 0:
# Guess the schema of the CSV file from the first chunk
# parquet_schema = pa.Table.from_pandas(df=chunk).schema
parquet_schema = pa.schema([
('c1', pa.string()),
('c2', pa.string()),
('c3', pa.string()),
('c4', pa.string()),
('c5', pa.string()),
('c6', pa.string())
])
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
# Write CSV chunk to the parquet file
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
df=df.astype(str)
将使用内置的 astype() 方法
object
dtypes 将 pandas 数据帧中的所有数据转换为字符串
您还可以更改单个列的类型,例如 df['Column4'] = df['Column4'].astype(str)
。
您需要做的就是在 parquet_writer.write_table(table)
之前更改数据框的类型或其列的子集。总之,您的代码将如下所示。
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000
def convert(df):
df['Column4'] = df['Column4'].astype(str)
return df
csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")
for i, chunk in enumerate(csv_stream):
print("Chunk", i)
if i == 0:
converted = convert(chunk)
parquet_schema = pa.Table.from_pandas(df=converted).schema
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
# Write CSV chunk to the parquet file
converted = convert(chunk)
table = pa.Table.from_pandas(converted, parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()