使用 dask 将多个 MAT 文件导入一个 DataFrame

Using dask to import many MAT files into one DataFrame

我有很多相同格式的 mat 文件,我希望将这些 mat 文件加入一个带有 DatetimeIndex 的 DataFrame 中。目前,for 循环读取这些 mat 文件并使用 scipy.io.loadmat 将每个文件的内容加载到 pandas DataFrame 中,然后将每个 DataFrame 附加到 hdf5 table.

每个 mat 文件包含一个 4096x1024 单精度矩阵,最初循环的每次迭代大约需要 1.5 秒。我已经用 806 个 mat 文件(12.5GB 大约需要 25 分钟)对此进行了测试,但我想将其应用于可能有数百万个这样的文件,我有兴趣找到一个工作流和数据容器,让我可以导入新数据和查询时间序列的子集很快。

是否可以使用 dask 或其他工具来加速此导入过程并创建单个可查询时间序列?

for rot_file in rotation_files:
    print(rot_file)
    time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
    polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
    polar_image = polar_image.transpose()
    polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
    rot_id = time_stamps[0]
    rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
    rot_id_df.index = polar_image.index
    polar_image.join(rot_id_df)
    polar_image.columns = [str(col_name) for col_name in polar_image.columns]
    polar_image.to_hdf('rot_data.h5', 'polar_image', format='table', append=True, complib='blosc', complevel=9)

似乎可以使用 dask.delayed 进行导入,但我不确定如何将其写入单个 hdf 文件。

为了查询数据,您不需要写入dask明确支持的数据格式。您可以按如下方式定义数据框:

def mat_to_dataframe(rot_file):
    time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
    polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
    polar_image = polar_image.transpose()
    polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
    rot_id = time_stamps[0]
    rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
    rot_id_df.index = polar_image.index
    polar_image.join(rot_id_df)
    polar_image.columns = [str(col_name) for col_name in polar_image.columns]
    return polar_image

from dask import delayed
import dask.dataframe as dd

parts = [delayed(mat_to_dataframe)(fn) for fn in matfiles_list]
df = dd.from_delayed(parts)

这是一个 "lazy" 数据框:您可以对其应用类似 pandas 的计算,但这些仅在您调用 .compute() 时执行。如果 matload 进程持有 python GIL,那么我建议使用分布式调度程序(即使在单机上)client = dask.distributed.Client().

如果你能先验知道每个section的时间戳,那么你也可以提供divisions=from_delayed,这意味着如果你的查询在索引上有过滤器,那么dask就会知道哪些文件不需要加载。

如果加载过程很慢,并且您想要更快的查询格式,请尝试 df.to_hdfdf.to_parquet。每个都有几个选项会影响您的表现。

请注意,使用 pd.to_datetime(time_stamps[0]) 可能会更快地实现 time_stamps[0].apply(convert_to_python_datetime).values