为什么在 dask 中计算索引 Parquet 文件的形状如此缓慢?
Why is computing the shape on an indexed Parquet file so slow in dask?
我从同一文件夹中的多个 Parquet 文件创建了一个 Parquet 文件。每个文件对应一个分区
Parquet 文件是在不同的进程中创建的(使用 Python concurrent.futures
)。这是我在一个过程中 运行 的代码示例:
# `df` is a standard Pandas DataFrame with
# 22 columns of different types and at most 100e3 rows.
# Set the index
df.set_index("cid", inplace=True)
# Write to single file
fastparquet.write(fpath, df, compression='snappy, file_scheme='simple)
df
最多包含 100e3
行(和 22 列)并在整数索引(称为 cid
)上建立索引。
然后我使用以下方法创建了两个元数据文件:
# `data_paths` contains the list of all the Parquet data files
# created in multiple processes.
fastparquet.writer.merge(data_paths, verify_schema=True)
确实 _metadata
和 _common_metadata
已在包含所有 Parquet 文件的文件夹中正确创建。
我天真地认为,因为数据被索引and/or它有元数据文件,获取数据大小等基本信息应该很快。例如,以下需要永远:
import dask.dataframe as ds
# `dataset_path` is the path to the folder
# containing all the Parquet files created above
# and the metadata files.
# It contains ~100-200 individual Parquet files
# for a total of ~60,000,000 rows
data = df.read_parquet(dataset_path)
data.shape[0].compute()
那是例外吗?
另请注意,大多数列为 int64
、float64
,少数为 object
(string
大小不同。
不幸的是,从元数据中提取数据帧长度的优化尚不存在。相反,dask 将每个分区加载到内存中并测量其长度。您会注意到,如果您 select 单个列(或索引),这种情况发生得更快:
len(data[onecolumn])
但是,您说的很对,对于 parquet 的特殊情况,可以从一组或多组元数据中预先知道长度,并且能够一次获得它会很好。请随时在 Dask 问题跟踪器上请求此功能。现在,您可以使用 fastparquet.ParquetFile
.
的 count
和 columns
属性
就个人而言,我直接使用 fastparquet 访问 Parquet 元数据,而不是 Dask。
元数据中有很多数据,值得一试。
我进一步注意到,如果你有很多文件,你可以将 fastparquet 操作放在延迟函数中,以使用 dask 并行读取 parquet 元数据。例如:
@dask.delayed
def read_pf(path_to_parquet_file):
pf = fastparquet.ParquetFile(path_to_parquet_file)
all_stats = pf.statistics.copy()
all_info = pf.info.copy()
我从同一文件夹中的多个 Parquet 文件创建了一个 Parquet 文件。每个文件对应一个分区
Parquet 文件是在不同的进程中创建的(使用 Python concurrent.futures
)。这是我在一个过程中 运行 的代码示例:
# `df` is a standard Pandas DataFrame with
# 22 columns of different types and at most 100e3 rows.
# Set the index
df.set_index("cid", inplace=True)
# Write to single file
fastparquet.write(fpath, df, compression='snappy, file_scheme='simple)
df
最多包含 100e3
行(和 22 列)并在整数索引(称为 cid
)上建立索引。
然后我使用以下方法创建了两个元数据文件:
# `data_paths` contains the list of all the Parquet data files
# created in multiple processes.
fastparquet.writer.merge(data_paths, verify_schema=True)
确实 _metadata
和 _common_metadata
已在包含所有 Parquet 文件的文件夹中正确创建。
我天真地认为,因为数据被索引and/or它有元数据文件,获取数据大小等基本信息应该很快。例如,以下需要永远:
import dask.dataframe as ds
# `dataset_path` is the path to the folder
# containing all the Parquet files created above
# and the metadata files.
# It contains ~100-200 individual Parquet files
# for a total of ~60,000,000 rows
data = df.read_parquet(dataset_path)
data.shape[0].compute()
那是例外吗?
另请注意,大多数列为 int64
、float64
,少数为 object
(string
大小不同。
不幸的是,从元数据中提取数据帧长度的优化尚不存在。相反,dask 将每个分区加载到内存中并测量其长度。您会注意到,如果您 select 单个列(或索引),这种情况发生得更快:
len(data[onecolumn])
但是,您说的很对,对于 parquet 的特殊情况,可以从一组或多组元数据中预先知道长度,并且能够一次获得它会很好。请随时在 Dask 问题跟踪器上请求此功能。现在,您可以使用 fastparquet.ParquetFile
.
count
和 columns
属性
就个人而言,我直接使用 fastparquet 访问 Parquet 元数据,而不是 Dask。
元数据中有很多数据,值得一试。
我进一步注意到,如果你有很多文件,你可以将 fastparquet 操作放在延迟函数中,以使用 dask 并行读取 parquet 元数据。例如:
@dask.delayed
def read_pf(path_to_parquet_file):
pf = fastparquet.ParquetFile(path_to_parquet_file)
all_stats = pf.statistics.copy()
all_info = pf.info.copy()