使用 Dask 加载许多具有不同列的 CSV 文件

using Dask to load many CSV files with different columns

我在 AWS s3 中保存了很多 CSV 文件,它们具有相同的第一组列和许多可选列。我不想一个一个地下载它们然后使用 pd.concat 来阅读它,因为这会花费很多时间并且必须适合计算机内存。相反,我正在尝试使用 Dask 加载和汇总所有这些文件,此时应将可选列视为零。

如果我可以使用相同的所有列:

    import dask.dataframe as dd
    addr = "s3://SOME_BASE_ADDRESS/*.csv"
    df = dd.read_csv(addr)
    df.groupby(["index"]).sum().compute()

但它不适用于列数不同的文件,因为 Dask 假定它可以对所有文件使用第一列:

File ".../lib/python3.7/site-packages/pandas/core/internals/managers.py", line 155, in set_axis 'values have {new} elements'.format(old=old_len, new=new_len)) ValueError: Length mismatch: Expected axis has 64 elements, new values have 62 elements

根据 this thread,我可以提前阅读所有 headers(例如,在我生成并保存所有小 CSV 时编写它们)或使用类似这样的东西:

df = dd.concat([dd.read_csv(f) for f in filelist])

我想知道这个解决方案是否真的 faster/better 而不是直接使用 pandas?一般来说,我想知道解决此问题的最佳(主要是最快)方法是什么?

在将数据帧转换为 dask 数据帧之前使用 delayed 标准化数据帧可能是个好主意(很难判断这是否最适合您的用例)。

import dask.dataframe as dd
from dask import delayed

list_files = [...] # create a list of files inside s3 bucket
list_cols_to_keep = ['col1', 'col2']

@delayed
def standard_csv(file_path):
    df = pd.read_csv(file_path)
    df = df[list_cols_to_keep]
    # add any other standardization routines, e.g. dtype conversion
    return df

ddf = dd.from_delayed([standard_csv(f) for f in list_files])

我最终放弃了使用 Dask,因为它太慢了,我使用 aws s3 sync 来下载数据,并使用 multiprocessing.Pool 来读取和连接它们:

# download:
def sync_outputs(out_path):
    local_dir_path = f"/tmp/outputs/"
    safe_mkdir(os.path.dirname(local_dir_path))
    cmd = f'aws s3 sync {job_output_dir} {local_dir_path} > /tmp/null' # the last part is to avoid prints
    os.system(cmd)
    return local_dir_path

# concat:
def read_csv(path):
    return pd.read_csv(path,index_col=0)

def read_csvs_parallel(local_paths):
    from multiprocessing import Pool
    import os
    with Pool(os.cpu_count()) as p:
        csvs = list(tqdm(p.imap(read_csv, local_paths), desc='reading csvs', total=len(paths)))
    return csvs

# all togeter:
def concat_csvs_parallel(out_path):
    local_paths = sync_outputs(out_path)
    csvs = read_csvs_parallel(local_paths)
    df = pd.concat(csvs)
    return df

aws s3 sync 在大约 30 秒内下载了大约 1000 个文件(每个文件约 1KB),读取比使用多处理(8 核)需要 3 秒,这比使用 [=15 下载文件快得多=](1000 个文件将近 2 分钟)