如何转换 Pandas 数据框模式

How to Convert Pandas Data Frame Schema

我正在使用 pandas.read_csv 读取 CSV 文件,它会自动检测类似于

的架构
Column1: string
Column2: string
Column3: string
Column4: int64
Column5: double
Column6: double
__index_level_0__: int64

然后,我尝试用 pyarrow.parquet.write_table 将其编写为 Parquet table。但是,我想对新的镶木地板文件使用以下架构

Column1: string
Column2: string
Column3: string
Column4: string
Column5: string
Column6: string
__index_level_0__: int64

但是我收到一条错误消息 "Table schema does not match schema used to create file"。这是我用来将 CSV 文件转换为 Parquet 文件的代码 borrowed from here

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000

csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")

for i, chunk in enumerate(csv_stream):
    print("Chunk", i)
    if i == 0:
        # Guess the schema of the CSV file from the first chunk
        # parquet_schema = pa.Table.from_pandas(df=chunk).schema
        parquet_schema = pa.schema([
            ('c1', pa.string()),
            ('c2', pa.string()),
            ('c3', pa.string()),
            ('c4', pa.string()),
            ('c5', pa.string()),
            ('c6', pa.string())
        ])
        # Open a Parquet file for writing
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
    # Write CSV chunk to the parquet file
    table = pa.Table.from_pandas(chunk, schema=parquet_schema)
    parquet_writer.write_table(table)

parquet_writer.close()

df=df.astype(str) 将使用内置的 astype() 方法

使用 object dtypes 将 pandas 数据帧中的所有数据转换为字符串

您还可以更改单个列的类型,例如 df['Column4'] = df['Column4'].astype(str)

您需要做的就是在 parquet_writer.write_table(table) 之前更改数据框的类型或其列的子集。总之,您的代码将如下所示。

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000

def convert(df):
    df['Column4'] = df['Column4'].astype(str)
    return df

csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")

for i, chunk in enumerate(csv_stream):
    print("Chunk", i)
    if i == 0:            
        converted = convert(chunk)
        parquet_schema = pa.Table.from_pandas(df=converted).schema

        # Open a Parquet file for writing
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')

    # Write CSV chunk to the parquet file
    converted = convert(chunk)
    table = pa.Table.from_pandas(converted, parquet_schema)
    parquet_writer.write_table(table)

parquet_writer.close()