Python 和 Dask - 读取和连接多个文件

Python and Dask - reading and concatenating multiple files

我有一些 parquet 文件,它们都来自同一个域,但结构有些不同。我需要连接所有这些。下面是这些文件的一些示例:

file 1:
A,B
True,False
False,False

file 2:
A,C
True,False
False,True
True,True

我想要做的是以最快的方式读取和连接这些文件,以获得以下结果:

A,B,C
True,False,NaN
False,False,NaN
True,NaN,False
False,NaN,True
True,NaN,True

为此,我使用以下代码,使用 (Reading multiple files with Dask, ):

提取
import glob

import dask.dataframe as dd
from dask.distributed import Client
import dask

def read_parquet(path):
    return pd.read_parquet(path)

if __name__=='__main__':

    files = glob.glob('test/*/file.parquet')

    print('Start dask client...')
    client = Client()

    results = [dd.from_delayed(dask.delayed(read_parquet)(diag)) for diag in diag_files]

    results = dd.concat(results).compute()

    client.close()

这段代码有效,它已经是我能想到的最快的版本(我尝试了顺序 pandasmultiprocessing.Pool)。我的想法是 Dask 可以理想地开始部分连接,同时仍然读取一些文件,但是,从任务图中我看到每个镶木地板文件的元数据的一些顺序读取,请参见下面的屏幕截图:

任务图的第一部分是 read_parquetread_metadata 的混合。第一部分始终只显示执行的 1 个任务(在任务处理选项卡中)。第二部分是 from_delayedconcat 的组合,它使用了我所有的工人。

关于如何加快文件读取速度并减少图表第一部分的执行时间有什么建议吗?

您的代码存在问题,您使用的是 Pandas 版本 read_parquet.

改为使用:

  • dask 版本 read_parquet,
  • mapgather 方法由 Client,
  • 提供
  • dask 版本 concat,

类似于:

def read_parquet(path):
    return dd.read_parquet(path)

def myRead():
    L = client.map(read_parquet, glob.glob('file_*.parquet'))
    lst = client.gather(L)
    return dd.concat(lst)

result = myRead().compute()

在此之前,我只创建了一个 客户端。 原因是在我之前的实验中我得到了一个错误 当我试图再次创建它时(在一个函数中)出现消息,甚至 虽然之前已经关闭了第一个实例。