Dask Parquet 使用数据模式加载文件

Dask Parquet loading files with data schema

这是一个与此相关的问题

我正在试验 Dask 和 Parquet 文件。我加载了我下载的纽约停车违规数据 here

我读取数据文件,找到公共列,应用数据类型,然后将所有内容保存为 parquet collevtion

from dask import dataframe as dd
from dask.diagnostics import ProgressBar
import numpy as np

base_url = 'origin/nyc-parking-tickets/'

fy14 = dd.read_csv(base_url + '*2014_.csv')
fy15 = dd.read_csv(base_url + '*2015.csv')
fy16 = dd.read_csv(base_url + '*2016.csv')
fy17 = dd.read_csv(base_url + '*2017.csv')

data = [fy14, fy15, fy16, fy17]
col_set = [set(d.columns) for d in data]
common_columns = list(set.intersection(*col_set))

# Set proper column types
dtype_tuples = [(x, np.str) for x in common_columns]
dtypes = dict(dtype_tuples)

floats = ['Feet From Curb', 'Issuer Code', 'Issuer Precinct', 'Law Section', 'Vehicle Year', 'Violation Precinct']
ints32 = ['Street Code1', 'Street Code2', 'Street Code3', 'Summons Number']
ints16 = ['Violation Code']

for item in floats: dtypes[item] = np.float32
for item in ints32: dtypes[item] = np.int32
for item in ints16: dtypes[item] = np.int16

# Read Data
data = dd.read_csv(base_url + '*.csv', dtype=dtypes, usecols=common_columns) # usecols not in Dask documentation, but from pandas

# Write data as parquet
target_url = 'target/nyc-parking-tickets-pq/'
with ProgressBar():
    data.to_parquet(target_url)

当我尝试重新加载数据时

data2 = dd.read_parquet(target_url, engine='pyarrow')

我收到一个 ValueError,即某些分区具有不同的文件格式。查看输出,我可以看到 'Violation Legal Code' 列在一个分区中被解释为 null,大概是因为数据太稀疏而无法采样。

在原问题的post中提出了两个解决方案。第一个是关于输入虚拟值,另一个是在加载数据时提供列类型。我想做后者,但我被卡住了。 在 dd.read_csv 方法中,我可以传递 dtype 参数,为此我只需输入上面定义的 dtypes 字典。 dd.read_parquet 不接受该关键字。在 documentation 中似乎暗示 categories 正在接管那个角色,但即使通过 categories=dtypes,我仍然得到同样的错误。

如何在 dask.dataframe.read_parquet 中传递类型规范?

您不能将数据类型传递给 read_parquet,因为 Parquet 文件知道它们自己的数据类型(在 CSV 中它是不明确的)。 Dask DataFrame 期望一个数据集的所有文件都具有相同的模式,截至2019-03-26,不支持加载混合模式的数据。

话虽如此,您可以使用 Dask Delayed 之类的工具自己完成此操作,在逐个文件的基础上执行您需要执行的任何操作,然后使用 dd.from_delayed 将它们转换为 Dask DataFrame .有关更多信息,请参见此处。

看来问题出在镶木地板引擎上。当我将代码更改为

data.to_parquet(target_url, engine = 'fastparquet')

data.from_parquet(target_url, engine = 'fastparquet')

写入和加载工作正常。