使用 dask 将多个 MAT 文件导入一个 DataFrame
Using dask to import many MAT files into one DataFrame
我有很多相同格式的 mat 文件,我希望将这些 mat 文件加入一个带有 DatetimeIndex 的 DataFrame 中。目前,for 循环读取这些 mat 文件并使用 scipy.io.loadmat 将每个文件的内容加载到 pandas DataFrame 中,然后将每个 DataFrame 附加到 hdf5 table.
每个 mat 文件包含一个 4096x1024 单精度矩阵,最初循环的每次迭代大约需要 1.5 秒。我已经用 806 个 mat 文件(12.5GB 大约需要 25 分钟)对此进行了测试,但我想将其应用于可能有数百万个这样的文件,我有兴趣找到一个工作流和数据容器,让我可以导入新数据和查询时间序列的子集很快。
是否可以使用 dask 或其他工具来加速此导入过程并创建单个可查询时间序列?
for rot_file in rotation_files:
print(rot_file)
time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
polar_image = polar_image.transpose()
polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
rot_id = time_stamps[0]
rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
rot_id_df.index = polar_image.index
polar_image.join(rot_id_df)
polar_image.columns = [str(col_name) for col_name in polar_image.columns]
polar_image.to_hdf('rot_data.h5', 'polar_image', format='table', append=True, complib='blosc', complevel=9)
似乎可以使用 dask.delayed 进行导入,但我不确定如何将其写入单个 hdf 文件。
为了查询数据,您不需要写入dask明确支持的数据格式。您可以按如下方式定义数据框:
def mat_to_dataframe(rot_file):
time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
polar_image = polar_image.transpose()
polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
rot_id = time_stamps[0]
rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
rot_id_df.index = polar_image.index
polar_image.join(rot_id_df)
polar_image.columns = [str(col_name) for col_name in polar_image.columns]
return polar_image
from dask import delayed
import dask.dataframe as dd
parts = [delayed(mat_to_dataframe)(fn) for fn in matfiles_list]
df = dd.from_delayed(parts)
这是一个 "lazy" 数据框:您可以对其应用类似 pandas 的计算,但这些仅在您调用 .compute()
时执行。如果 matload 进程持有 python GIL,那么我建议使用分布式调度程序(即使在单机上)client = dask.distributed.Client()
.
如果你能先验知道每个section的时间戳,那么你也可以提供divisions=
到from_delayed
,这意味着如果你的查询在索引上有过滤器,那么dask就会知道哪些文件不需要加载。
如果加载过程很慢,并且您想要更快的查询格式,请尝试 df.to_hdf
或 df.to_parquet
。每个都有几个选项会影响您的表现。
请注意,使用 pd.to_datetime(time_stamps[0])
可能会更快地实现 time_stamps[0].apply(convert_to_python_datetime).values
。
我有很多相同格式的 mat 文件,我希望将这些 mat 文件加入一个带有 DatetimeIndex 的 DataFrame 中。目前,for 循环读取这些 mat 文件并使用 scipy.io.loadmat 将每个文件的内容加载到 pandas DataFrame 中,然后将每个 DataFrame 附加到 hdf5 table.
每个 mat 文件包含一个 4096x1024 单精度矩阵,最初循环的每次迭代大约需要 1.5 秒。我已经用 806 个 mat 文件(12.5GB 大约需要 25 分钟)对此进行了测试,但我想将其应用于可能有数百万个这样的文件,我有兴趣找到一个工作流和数据容器,让我可以导入新数据和查询时间序列的子集很快。
是否可以使用 dask 或其他工具来加速此导入过程并创建单个可查询时间序列?
for rot_file in rotation_files:
print(rot_file)
time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
polar_image = polar_image.transpose()
polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
rot_id = time_stamps[0]
rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
rot_id_df.index = polar_image.index
polar_image.join(rot_id_df)
polar_image.columns = [str(col_name) for col_name in polar_image.columns]
polar_image.to_hdf('rot_data.h5', 'polar_image', format='table', append=True, complib='blosc', complevel=9)
似乎可以使用 dask.delayed 进行导入,但我不确定如何将其写入单个 hdf 文件。
为了查询数据,您不需要写入dask明确支持的数据格式。您可以按如下方式定义数据框:
def mat_to_dataframe(rot_file):
time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
polar_image = polar_image.transpose()
polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
rot_id = time_stamps[0]
rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
rot_id_df.index = polar_image.index
polar_image.join(rot_id_df)
polar_image.columns = [str(col_name) for col_name in polar_image.columns]
return polar_image
from dask import delayed
import dask.dataframe as dd
parts = [delayed(mat_to_dataframe)(fn) for fn in matfiles_list]
df = dd.from_delayed(parts)
这是一个 "lazy" 数据框:您可以对其应用类似 pandas 的计算,但这些仅在您调用 .compute()
时执行。如果 matload 进程持有 python GIL,那么我建议使用分布式调度程序(即使在单机上)client = dask.distributed.Client()
.
如果你能先验知道每个section的时间戳,那么你也可以提供divisions=
到from_delayed
,这意味着如果你的查询在索引上有过滤器,那么dask就会知道哪些文件不需要加载。
如果加载过程很慢,并且您想要更快的查询格式,请尝试 df.to_hdf
或 df.to_parquet
。每个都有几个选项会影响您的表现。
请注意,使用 pd.to_datetime(time_stamps[0])
可能会更快地实现 time_stamps[0].apply(convert_to_python_datetime).values
。