dask dataframe 读取 parquet 架构差异
dask dataframe read parquet schema difference
我执行以下操作:
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
raw_data_df = dd.read_csv('dataset/nyctaxi/nyctaxi/*.csv', assume_missing=True, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
该数据集取自 Mathew Rocklin 所做的演示文稿,并用作 dask 数据帧演示。然后我尝试使用 pyarrow
将它写入 parquet
raw_data_df.to_parquet(path='dataset/parquet/2015.parquet/') # only pyarrow is installed
正在尝试复读:
raw_data_df = dd.read_parquet(path='dataset/parquet/2015.parquet/')
我收到以下错误:
ValueError: Schema in dataset/parquet/2015.parquet//part.192.parquet was different.
VendorID: double
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: int64
store_and_fwd_flag: binary
dropoff_longitude: double
dropoff_latitude: double
payment_type: double
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
metadata
--------
{'pandas': '{"pandas_version": "0.22.0", "index_columns": [], "columns": [{"metadata": null, "field_name": "VendorID", "name": "VendorID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tpep_pickup_datetime", "name": "tpep_pickup_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "tpep_dropoff_datetime", "name": "tpep_dropoff_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "passenger_count", "name": "passenger_count", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "trip_distance", "name": "trip_distance", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_longitude", "name": "pickup_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_latitude", "name": "pickup_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "RateCodeID", "name": "RateCodeID", "numpy_type": "int64", "pandas_type": "int64"}, {"metadata": null, "field_name": "store_and_fwd_flag", "name": "store_and_fwd_flag", "numpy_type": "object", "pandas_type": "bytes"}, {"metadata": null, "field_name": "dropoff_longitude", "name": "dropoff_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "dropoff_latitude", "name": "dropoff_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "payment_type", "name": "payment_type", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "fare_amount", "name": "fare_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "extra", "name": "extra", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "mta_tax", "name": "mta_tax", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tip_amount", "name": "tip_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tolls_amount", "name": "tolls_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "improvement_surcharge", "name": "improvement_surcharge", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "total_amount", "name": "total_amount", "numpy_type": "float64", "pandas_type": "float64"}], "column_indexes": []}'}
vs
VendorID: double
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: double
store_and_fwd_flag: binary
dropoff_longitude: double
dropoff_latitude: double
payment_type: double
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
metadata
--------
{'pandas': '{"pandas_version": "0.22.0", "index_columns": [], "columns": [{"metadata": null, "field_name": "VendorID", "name": "VendorID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tpep_pickup_datetime", "name": "tpep_pickup_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "tpep_dropoff_datetime", "name": "tpep_dropoff_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "passenger_count", "name": "passenger_count", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "trip_distance", "name": "trip_distance", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_longitude", "name": "pickup_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_latitude", "name": "pickup_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "RateCodeID", "name": "RateCodeID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "store_and_fwd_flag", "name": "store_and_fwd_flag", "numpy_type": "object", "pandas_type": "bytes"}, {"metadata": null, "field_name": "dropoff_longitude", "name": "dropoff_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "dropoff_latitude", "name": "dropoff_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "payment_type", "name": "payment_type", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "fare_amount", "name": "fare_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "extra", "name": "extra", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "mta_tax", "name": "mta_tax", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tip_amount", "name": "tip_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tolls_amount", "name": "tolls_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "improvement_surcharge", "name": "improvement_surcharge", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "total_amount", "name": "total_amount", "numpy_type": "float64", "pandas_type": "float64"}], "column_indexes": []}'}
但看起来他们看起来一模一样。对确定原因有帮助吗?
以下两个 numpy 规范不一致
{'metadata': None, 'field_name': 'RateCodeID', 'name': 'RateCodeID', 'numpy_type': 'int64', 'pandas_type': 'int64'}
RateCodeID: int64
{'metadata': None, 'field_name': 'RateCodeID', 'name': 'RateCodeID', 'numpy_type': 'float64', 'pandas_type': 'float64'}
RateCodeID: double
(仔细看!)
我建议您在加载时为这些列提供 dtype,或者在写入前使用 astype
将它们强制为浮点数。
这个问题涉及 Pandas 和 Dask 中最棘手的问题之一,即数据类型的可空性或空性缺失。因此,丢失数据可能会导致问题,尤其是对于没有丢失数据指定的数据类型,例如整数。
浮点数和日期时间还不错,因为它们指定了空值或缺失值占位符(NaN 用于 numpy 中的浮点值,NaT 用于 pandas 中的日期时间)因此可以为空。但即使是那些数据类型在某些情况下也有问题。
当您读取多个 CSV 文件(如您的情况),或从数据库中提取数据,或将一个小数据框合并到一个较大的数据框时,可能会出现此问题。您最终可能会得到其中缺少给定字段的部分或全部值的分区。对于这些分区,Dask 和 Pandas 将为可以容纳缺失数据指示符的字段分配一个数据类型。在整数的情况下,新的 dtype 将是 float。在写入镶木地板时,它会进一步转换为两倍。
Dask 会很乐意为该字段列出一个有点误导的 dtype。但是,当您写入 parquet 时,缺少数据的分区会被写入其他内容。与您的情况一样, "int64" 在至少一个镶木地板文件中被写为 "double" 。然后,当您尝试读取整个 Dask 数据帧时,由于不匹配,您得到了上面显示的 ValueError。
在解决这些问题之前,您需要确保所有 Dask 字段的每一行都有适当的数据。例如,如果您有一个 int64 字段,则 NaN 值或缺失值的其他一些非整数表示将不起作用。
您的 int64 字段可能需要通过几个步骤修复:
导入Pandas:
import pandas as pd
将字段数据清理为 float64 并将缺失值强制转换为 NaN:
df['myint64'] = df['myint64'].map_partitions(
pd.to_numeric,
meta='f8',
errors='coerce'
)
Select 用于替代 NaN 的标志值(例如 -1.0),以便 int64 可以工作:
df['myint64'] = df['myint64'].where(
~df['myint64'].isna(),
-1.0
)
将您的字段转换为 int64 并全部保留:
df['myint64'] = df['myint64'].astype('i8')
df = client.persist(df)
然后尝试保存并重新读取往返。
注意:步骤 1-2 可用于修复 float64 字段。
最后,要修复日期时间字段,试试这个:
df['mydateime'] = df['mydateime'].map_partitions(
pd.to_datetime,
meta='M8',
infer_datetime_format=True,
errors='coerce'
).persist()
我执行以下操作:
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
raw_data_df = dd.read_csv('dataset/nyctaxi/nyctaxi/*.csv', assume_missing=True, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
该数据集取自 Mathew Rocklin 所做的演示文稿,并用作 dask 数据帧演示。然后我尝试使用 pyarrow
将它写入 parquetraw_data_df.to_parquet(path='dataset/parquet/2015.parquet/') # only pyarrow is installed
正在尝试复读:
raw_data_df = dd.read_parquet(path='dataset/parquet/2015.parquet/')
我收到以下错误:
ValueError: Schema in dataset/parquet/2015.parquet//part.192.parquet was different.
VendorID: double
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: int64
store_and_fwd_flag: binary
dropoff_longitude: double
dropoff_latitude: double
payment_type: double
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
metadata
--------
{'pandas': '{"pandas_version": "0.22.0", "index_columns": [], "columns": [{"metadata": null, "field_name": "VendorID", "name": "VendorID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tpep_pickup_datetime", "name": "tpep_pickup_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "tpep_dropoff_datetime", "name": "tpep_dropoff_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "passenger_count", "name": "passenger_count", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "trip_distance", "name": "trip_distance", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_longitude", "name": "pickup_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_latitude", "name": "pickup_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "RateCodeID", "name": "RateCodeID", "numpy_type": "int64", "pandas_type": "int64"}, {"metadata": null, "field_name": "store_and_fwd_flag", "name": "store_and_fwd_flag", "numpy_type": "object", "pandas_type": "bytes"}, {"metadata": null, "field_name": "dropoff_longitude", "name": "dropoff_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "dropoff_latitude", "name": "dropoff_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "payment_type", "name": "payment_type", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "fare_amount", "name": "fare_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "extra", "name": "extra", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "mta_tax", "name": "mta_tax", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tip_amount", "name": "tip_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tolls_amount", "name": "tolls_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "improvement_surcharge", "name": "improvement_surcharge", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "total_amount", "name": "total_amount", "numpy_type": "float64", "pandas_type": "float64"}], "column_indexes": []}'}
vs
VendorID: double
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: double
store_and_fwd_flag: binary
dropoff_longitude: double
dropoff_latitude: double
payment_type: double
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
metadata
--------
{'pandas': '{"pandas_version": "0.22.0", "index_columns": [], "columns": [{"metadata": null, "field_name": "VendorID", "name": "VendorID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tpep_pickup_datetime", "name": "tpep_pickup_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "tpep_dropoff_datetime", "name": "tpep_dropoff_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "passenger_count", "name": "passenger_count", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "trip_distance", "name": "trip_distance", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_longitude", "name": "pickup_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_latitude", "name": "pickup_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "RateCodeID", "name": "RateCodeID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "store_and_fwd_flag", "name": "store_and_fwd_flag", "numpy_type": "object", "pandas_type": "bytes"}, {"metadata": null, "field_name": "dropoff_longitude", "name": "dropoff_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "dropoff_latitude", "name": "dropoff_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "payment_type", "name": "payment_type", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "fare_amount", "name": "fare_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "extra", "name": "extra", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "mta_tax", "name": "mta_tax", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tip_amount", "name": "tip_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tolls_amount", "name": "tolls_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "improvement_surcharge", "name": "improvement_surcharge", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "total_amount", "name": "total_amount", "numpy_type": "float64", "pandas_type": "float64"}], "column_indexes": []}'}
但看起来他们看起来一模一样。对确定原因有帮助吗?
以下两个 numpy 规范不一致
{'metadata': None, 'field_name': 'RateCodeID', 'name': 'RateCodeID', 'numpy_type': 'int64', 'pandas_type': 'int64'}
RateCodeID: int64
{'metadata': None, 'field_name': 'RateCodeID', 'name': 'RateCodeID', 'numpy_type': 'float64', 'pandas_type': 'float64'}
RateCodeID: double
(仔细看!)
我建议您在加载时为这些列提供 dtype,或者在写入前使用 astype
将它们强制为浮点数。
这个问题涉及 Pandas 和 Dask 中最棘手的问题之一,即数据类型的可空性或空性缺失。因此,丢失数据可能会导致问题,尤其是对于没有丢失数据指定的数据类型,例如整数。
浮点数和日期时间还不错,因为它们指定了空值或缺失值占位符(NaN 用于 numpy 中的浮点值,NaT 用于 pandas 中的日期时间)因此可以为空。但即使是那些数据类型在某些情况下也有问题。
当您读取多个 CSV 文件(如您的情况),或从数据库中提取数据,或将一个小数据框合并到一个较大的数据框时,可能会出现此问题。您最终可能会得到其中缺少给定字段的部分或全部值的分区。对于这些分区,Dask 和 Pandas 将为可以容纳缺失数据指示符的字段分配一个数据类型。在整数的情况下,新的 dtype 将是 float。在写入镶木地板时,它会进一步转换为两倍。
Dask 会很乐意为该字段列出一个有点误导的 dtype。但是,当您写入 parquet 时,缺少数据的分区会被写入其他内容。与您的情况一样, "int64" 在至少一个镶木地板文件中被写为 "double" 。然后,当您尝试读取整个 Dask 数据帧时,由于不匹配,您得到了上面显示的 ValueError。
在解决这些问题之前,您需要确保所有 Dask 字段的每一行都有适当的数据。例如,如果您有一个 int64 字段,则 NaN 值或缺失值的其他一些非整数表示将不起作用。
您的 int64 字段可能需要通过几个步骤修复:
导入Pandas:
import pandas as pd
将字段数据清理为 float64 并将缺失值强制转换为 NaN:
df['myint64'] = df['myint64'].map_partitions( pd.to_numeric, meta='f8', errors='coerce' )
Select 用于替代 NaN 的标志值(例如 -1.0),以便 int64 可以工作:
df['myint64'] = df['myint64'].where( ~df['myint64'].isna(), -1.0 )
将您的字段转换为 int64 并全部保留:
df['myint64'] = df['myint64'].astype('i8') df = client.persist(df)
然后尝试保存并重新读取往返。
注意:步骤 1-2 可用于修复 float64 字段。
最后,要修复日期时间字段,试试这个:
df['mydateime'] = df['mydateime'].map_partitions(
pd.to_datetime,
meta='M8',
infer_datetime_format=True,
errors='coerce'
).persist()