使用 Dask 加载许多具有不同列的 CSV 文件
using Dask to load many CSV files with different columns
我在 AWS s3 中保存了很多 CSV 文件,它们具有相同的第一组列和许多可选列。我不想一个一个地下载它们然后使用 pd.concat
来阅读它,因为这会花费很多时间并且必须适合计算机内存。相反,我正在尝试使用 Dask 加载和汇总所有这些文件,此时应将可选列视为零。
如果我可以使用相同的所有列:
import dask.dataframe as dd
addr = "s3://SOME_BASE_ADDRESS/*.csv"
df = dd.read_csv(addr)
df.groupby(["index"]).sum().compute()
但它不适用于列数不同的文件,因为 Dask 假定它可以对所有文件使用第一列:
File ".../lib/python3.7/site-packages/pandas/core/internals/managers.py", line 155, in set_axis
'values have {new} elements'.format(old=old_len, new=new_len))
ValueError: Length mismatch: Expected axis has 64 elements, new values have 62 elements
根据 this thread,我可以提前阅读所有 headers(例如,在我生成并保存所有小 CSV 时编写它们)或使用类似这样的东西:
df = dd.concat([dd.read_csv(f) for f in filelist])
我想知道这个解决方案是否真的 faster/better 而不是直接使用 pandas?一般来说,我想知道解决此问题的最佳(主要是最快)方法是什么?
在将数据帧转换为 dask 数据帧之前使用 delayed
标准化数据帧可能是个好主意(很难判断这是否最适合您的用例)。
import dask.dataframe as dd
from dask import delayed
list_files = [...] # create a list of files inside s3 bucket
list_cols_to_keep = ['col1', 'col2']
@delayed
def standard_csv(file_path):
df = pd.read_csv(file_path)
df = df[list_cols_to_keep]
# add any other standardization routines, e.g. dtype conversion
return df
ddf = dd.from_delayed([standard_csv(f) for f in list_files])
我最终放弃了使用 Dask
,因为它太慢了,我使用 aws s3 sync
来下载数据,并使用 multiprocessing.Pool
来读取和连接它们:
# download:
def sync_outputs(out_path):
local_dir_path = f"/tmp/outputs/"
safe_mkdir(os.path.dirname(local_dir_path))
cmd = f'aws s3 sync {job_output_dir} {local_dir_path} > /tmp/null' # the last part is to avoid prints
os.system(cmd)
return local_dir_path
# concat:
def read_csv(path):
return pd.read_csv(path,index_col=0)
def read_csvs_parallel(local_paths):
from multiprocessing import Pool
import os
with Pool(os.cpu_count()) as p:
csvs = list(tqdm(p.imap(read_csv, local_paths), desc='reading csvs', total=len(paths)))
return csvs
# all togeter:
def concat_csvs_parallel(out_path):
local_paths = sync_outputs(out_path)
csvs = read_csvs_parallel(local_paths)
df = pd.concat(csvs)
return df
aws s3 sync
在大约 30 秒内下载了大约 1000 个文件(每个文件约 1KB),读取比使用多处理(8 核)需要 3 秒,这比使用 [=15 下载文件快得多=](1000 个文件将近 2 分钟)
我在 AWS s3 中保存了很多 CSV 文件,它们具有相同的第一组列和许多可选列。我不想一个一个地下载它们然后使用 pd.concat
来阅读它,因为这会花费很多时间并且必须适合计算机内存。相反,我正在尝试使用 Dask 加载和汇总所有这些文件,此时应将可选列视为零。
如果我可以使用相同的所有列:
import dask.dataframe as dd
addr = "s3://SOME_BASE_ADDRESS/*.csv"
df = dd.read_csv(addr)
df.groupby(["index"]).sum().compute()
但它不适用于列数不同的文件,因为 Dask 假定它可以对所有文件使用第一列:
File ".../lib/python3.7/site-packages/pandas/core/internals/managers.py", line 155, in set_axis 'values have {new} elements'.format(old=old_len, new=new_len)) ValueError: Length mismatch: Expected axis has 64 elements, new values have 62 elements
根据 this thread,我可以提前阅读所有 headers(例如,在我生成并保存所有小 CSV 时编写它们)或使用类似这样的东西:
df = dd.concat([dd.read_csv(f) for f in filelist])
我想知道这个解决方案是否真的 faster/better 而不是直接使用 pandas?一般来说,我想知道解决此问题的最佳(主要是最快)方法是什么?
在将数据帧转换为 dask 数据帧之前使用 delayed
标准化数据帧可能是个好主意(很难判断这是否最适合您的用例)。
import dask.dataframe as dd
from dask import delayed
list_files = [...] # create a list of files inside s3 bucket
list_cols_to_keep = ['col1', 'col2']
@delayed
def standard_csv(file_path):
df = pd.read_csv(file_path)
df = df[list_cols_to_keep]
# add any other standardization routines, e.g. dtype conversion
return df
ddf = dd.from_delayed([standard_csv(f) for f in list_files])
我最终放弃了使用 Dask
,因为它太慢了,我使用 aws s3 sync
来下载数据,并使用 multiprocessing.Pool
来读取和连接它们:
# download:
def sync_outputs(out_path):
local_dir_path = f"/tmp/outputs/"
safe_mkdir(os.path.dirname(local_dir_path))
cmd = f'aws s3 sync {job_output_dir} {local_dir_path} > /tmp/null' # the last part is to avoid prints
os.system(cmd)
return local_dir_path
# concat:
def read_csv(path):
return pd.read_csv(path,index_col=0)
def read_csvs_parallel(local_paths):
from multiprocessing import Pool
import os
with Pool(os.cpu_count()) as p:
csvs = list(tqdm(p.imap(read_csv, local_paths), desc='reading csvs', total=len(paths)))
return csvs
# all togeter:
def concat_csvs_parallel(out_path):
local_paths = sync_outputs(out_path)
csvs = read_csvs_parallel(local_paths)
df = pd.concat(csvs)
return df
aws s3 sync
在大约 30 秒内下载了大约 1000 个文件(每个文件约 1KB),读取比使用多处理(8 核)需要 3 秒,这比使用 [=15 下载文件快得多=](1000 个文件将近 2 分钟)