如何使用 dask/fastparquet 从多个目录读取多个镶木地板文件(具有相同架构)
How to read multiple parquet files (with same schema) from multiple directories with dask/fastparquet
我需要使用 dask 将具有相同架构的多个镶木地板文件加载到单个数据框中。当它们都在同一个目录中时这有效,但当它们在不同的目录中时无效。
例如:
import fastparquet
pfile = fastparquet.ParquetFile(['data/data1.parq', 'data/data2.parq'])
工作正常,但如果我将 data2.parq
复制到不同的目录,则以下内容不起作用:
pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq'])
我得到的回溯如下:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-11-b3d381f14edc> in <module>()
----> 1 pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq'])
~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep)
82 if isinstance(fn, (tuple, list)):
83 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 84 open_with=open_with)
85 self.fn = sep.join([basepath, '_metadata']) # effective file
86 self.fmd = fmd
~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with)
164 else:
165 raise ValueError("Merge requires all PaquetFile instances or none")
--> 166 basepath, file_list = analyse_paths(file_list, sep)
167
168 if verify_schema:
~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in analyse_paths(file_list, sep)
221 if len({tuple([p.split('=')[0] for p in parts[l:-1]])
222 for parts in path_parts_list}) > 1:
--> 223 raise ValueError('Partitioning directories do not agree')
224 for path_parts in path_parts_list:
225 for path_part in path_parts[l:-1]:
ValueError: Partitioning directories do not agree
我在使用 dask.dataframe.read_parquet
时遇到同样的错误,我假设它使用相同的 ParquetFile
对象。
如何从不同目录加载多个文件?将我需要加载的所有文件放在同一个目录中不是一个选项。
解决方法是分别读取每个块并传递给 dask.dataframe.from_delayed
。这与 read_parquet
所做的元数据处理不完全相同('index'
下方应该是索引),但除此之外应该可以工作。
import dask.dataframe as dd
from dask import delayed
from fastparquet import ParquetFile
@delayed
def load_chunk(pth):
return ParquetFile(pth).to_pandas()
files = ['temp/part.0.parquet', 'temp2/part.1.parquet']
df = dd.from_delayed([load_chunk(f) for f in files])
df.compute()
Out[38]:
index a
0 0 1
1 1 2
0 2 3
1 3 4
如果使用绝对路径或显式相对路径,这在 master 上的 fastparquet 中确实有效:
pfile = fastparquet.ParquetFile(['./data/data1.parq', './data2/data2.parq'])
前导 ./
的需要应该被认为是一个错误 - 请参阅问题。
To read from multiple files you can pass a globstring or a list of paths [...].
以下解决方案允许在各个 parquet 文件中使用不同的列,这对于 是不可能的。它将被并行化,因为它是本机 dask 命令。
import dask.dataframe as dd
files = ['temp/part.0.parquet', 'temp2/part.1.parquet']
df = dd.read_parquet(files)
df.compute()
我需要使用 dask 将具有相同架构的多个镶木地板文件加载到单个数据框中。当它们都在同一个目录中时这有效,但当它们在不同的目录中时无效。
例如:
import fastparquet
pfile = fastparquet.ParquetFile(['data/data1.parq', 'data/data2.parq'])
工作正常,但如果我将 data2.parq
复制到不同的目录,则以下内容不起作用:
pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq'])
我得到的回溯如下:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-11-b3d381f14edc> in <module>()
----> 1 pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq'])
~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep)
82 if isinstance(fn, (tuple, list)):
83 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 84 open_with=open_with)
85 self.fn = sep.join([basepath, '_metadata']) # effective file
86 self.fmd = fmd
~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with)
164 else:
165 raise ValueError("Merge requires all PaquetFile instances or none")
--> 166 basepath, file_list = analyse_paths(file_list, sep)
167
168 if verify_schema:
~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in analyse_paths(file_list, sep)
221 if len({tuple([p.split('=')[0] for p in parts[l:-1]])
222 for parts in path_parts_list}) > 1:
--> 223 raise ValueError('Partitioning directories do not agree')
224 for path_parts in path_parts_list:
225 for path_part in path_parts[l:-1]:
ValueError: Partitioning directories do not agree
我在使用 dask.dataframe.read_parquet
时遇到同样的错误,我假设它使用相同的 ParquetFile
对象。
如何从不同目录加载多个文件?将我需要加载的所有文件放在同一个目录中不是一个选项。
解决方法是分别读取每个块并传递给 dask.dataframe.from_delayed
。这与 read_parquet
所做的元数据处理不完全相同('index'
下方应该是索引),但除此之外应该可以工作。
import dask.dataframe as dd
from dask import delayed
from fastparquet import ParquetFile
@delayed
def load_chunk(pth):
return ParquetFile(pth).to_pandas()
files = ['temp/part.0.parquet', 'temp2/part.1.parquet']
df = dd.from_delayed([load_chunk(f) for f in files])
df.compute()
Out[38]:
index a
0 0 1
1 1 2
0 2 3
1 3 4
如果使用绝对路径或显式相对路径,这在 master 上的 fastparquet 中确实有效:
pfile = fastparquet.ParquetFile(['./data/data1.parq', './data2/data2.parq'])
前导 ./
的需要应该被认为是一个错误 - 请参阅问题。
To read from multiple files you can pass a globstring or a list of paths [...].
以下解决方案允许在各个 parquet 文件中使用不同的列,这对于
import dask.dataframe as dd
files = ['temp/part.0.parquet', 'temp2/part.1.parquet']
df = dd.read_parquet(files)
df.compute()