dask dataframe 读取 parquet 架构差异

dask dataframe read parquet schema difference

我执行以下操作:

import dask.dataframe as dd
from dask.distributed import Client
client = Client()

raw_data_df = dd.read_csv('dataset/nyctaxi/nyctaxi/*.csv', assume_missing=True, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

该数据集取自 Mathew Rocklin 所做的演示文稿,并用作 dask 数据帧演示。然后我尝试使用 pyarrow

将它写入 parquet
raw_data_df.to_parquet(path='dataset/parquet/2015.parquet/') # only pyarrow is installed

正在尝试复读:

raw_data_df = dd.read_parquet(path='dataset/parquet/2015.parquet/')

我收到以下错误:

ValueError: Schema in dataset/parquet/2015.parquet//part.192.parquet was different. 

VendorID: double
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: int64
store_and_fwd_flag: binary
dropoff_longitude: double
dropoff_latitude: double
payment_type: double
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
metadata
--------
{'pandas': '{"pandas_version": "0.22.0", "index_columns": [], "columns": [{"metadata": null, "field_name": "VendorID", "name": "VendorID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tpep_pickup_datetime", "name": "tpep_pickup_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "tpep_dropoff_datetime", "name": "tpep_dropoff_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "passenger_count", "name": "passenger_count", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "trip_distance", "name": "trip_distance", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_longitude", "name": "pickup_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_latitude", "name": "pickup_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "RateCodeID", "name": "RateCodeID", "numpy_type": "int64", "pandas_type": "int64"}, {"metadata": null, "field_name": "store_and_fwd_flag", "name": "store_and_fwd_flag", "numpy_type": "object", "pandas_type": "bytes"}, {"metadata": null, "field_name": "dropoff_longitude", "name": "dropoff_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "dropoff_latitude", "name": "dropoff_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "payment_type", "name": "payment_type", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "fare_amount", "name": "fare_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "extra", "name": "extra", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "mta_tax", "name": "mta_tax", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tip_amount", "name": "tip_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tolls_amount", "name": "tolls_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "improvement_surcharge", "name": "improvement_surcharge", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "total_amount", "name": "total_amount", "numpy_type": "float64", "pandas_type": "float64"}], "column_indexes": []}'}
vs

VendorID: double
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: double
store_and_fwd_flag: binary
dropoff_longitude: double
dropoff_latitude: double
payment_type: double
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
metadata
--------
{'pandas': '{"pandas_version": "0.22.0", "index_columns": [], "columns": [{"metadata": null, "field_name": "VendorID", "name": "VendorID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tpep_pickup_datetime", "name": "tpep_pickup_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "tpep_dropoff_datetime", "name": "tpep_dropoff_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "passenger_count", "name": "passenger_count", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "trip_distance", "name": "trip_distance", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_longitude", "name": "pickup_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_latitude", "name": "pickup_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "RateCodeID", "name": "RateCodeID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "store_and_fwd_flag", "name": "store_and_fwd_flag", "numpy_type": "object", "pandas_type": "bytes"}, {"metadata": null, "field_name": "dropoff_longitude", "name": "dropoff_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "dropoff_latitude", "name": "dropoff_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "payment_type", "name": "payment_type", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "fare_amount", "name": "fare_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "extra", "name": "extra", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "mta_tax", "name": "mta_tax", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tip_amount", "name": "tip_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tolls_amount", "name": "tolls_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "improvement_surcharge", "name": "improvement_surcharge", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "total_amount", "name": "total_amount", "numpy_type": "float64", "pandas_type": "float64"}], "column_indexes": []}'}

但看起来他们看起来一模一样。对确定原因有帮助吗?

以下两个 numpy 规范不一致

{'metadata': None, 'field_name': 'RateCodeID', 'name': 'RateCodeID', 'numpy_type': 'int64', 'pandas_type': 'int64'}

RateCodeID: int64 


{'metadata': None, 'field_name': 'RateCodeID', 'name': 'RateCodeID', 'numpy_type': 'float64', 'pandas_type': 'float64'}

RateCodeID: double

(仔细看!)

我建议您在加载时为这些列提供 dtype,或者在写入前使用 astype 将它们强制为浮点数。

这个问题涉及 Pandas 和 Dask 中最棘手的问题之一,即数据类型的可空性或空性缺失。因此,丢失数据可能会导致问题,尤其是对于没有丢失数据指定的数据类型,例如整数。

浮点数和日期时间还不错,因为它们指定了空值或缺失值占位符(NaN 用于 numpy 中的浮点值,NaT 用于 pandas 中的日期时间)因此可以为空。但即使是那些数据类型在某些情况下也有问题。

当您读取多个 CSV 文件(如您的情况),或从数据库中提取数据,或将一个小数据框合并到一个较大的数据框时,可能会出现此问题。您最终可能会得到其中缺少给定字段的部分或全部值的分区。对于这些分区,Dask 和 Pandas 将为可以容纳缺失数据指示符的字段分配一个数据类型。在整数的情况下,新的 dtype 将是 float。在写入镶木地板时,它会进一步转换为两倍。

Dask 会很乐意为该字段列出一个有点误导的 dtype。但是,当您写入 parquet 时,缺少数据的分区会被写入其他内容。与您的情况一样, "int64" 在至少一个镶木地板文件中被写为 "double" 。然后,当您尝试读取整个 Dask 数据帧时,由于不匹配,您得到了上面显示的 ValueError。

在解决这些问题之前,您需要确保所有 Dask 字段的每一行都有适当的数据。例如,如果您有一个 int64 字段,则 NaN 值或缺失值的其他一些非整数表示将不起作用。

您的 int64 字段可能需要通过几个步骤修复:

  1. 导入Pandas:

    import pandas as pd
    
  2. 将字段数据清理为 float64 并将缺失值强制转换为 NaN:

    df['myint64'] = df['myint64'].map_partitions(
        pd.to_numeric,
        meta='f8',
        errors='coerce'
    )
    
  3. Select 用于替代 NaN 的标志值(例如 -1.0),以便 int64 可以工作:

    df['myint64'] = df['myint64'].where(
        ~df['myint64'].isna(),
        -1.0
    )
    
  4. 将您的字段转换为 int64 并全部保留:

    df['myint64'] = df['myint64'].astype('i8')
    df = client.persist(df)
    
  5. 然后尝试保存并重新读取往返。

注意:步骤 1-2 可用于修复 float64 字段。

最后,要修复日期时间字段,试试这个:

    df['mydateime'] = df['mydateime'].map_partitions(
        pd.to_datetime,
        meta='M8',
        infer_datetime_format=True, 
        errors='coerce'
    ).persist()