dask 可以并行化从 csv 文件中读取数据吗?
Can dask parralelize reading fom a csv file?
我正在将一个大型文本文件转换为一个 hdf 存储,以期获得更快的数据访问速度。转换工作正常,但是从 csv 文件读取不是并行完成的。它真的很慢(SSD 上一个 1GB 的文本文件大约需要 30 分钟,所以我猜它不是 IO-bound)。
有没有办法让它在多个线程中并行读取?
因为它可能很重要,我目前被迫 运行 在 Windows 下——以防万一有什么不同。
from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df.categorize([ 'Type',
'Condition',
])
df.to_hdf("data/data.hdf", "Measurements", 'w')
是的,dask.dataframe可以并行读取。但是,您 运行 遇到了两个问题:
Pandas.read_csv 仅部分释放 GIL
默认情况下 dask.dataframe 与线程并行化,因为大多数 Pandas 可以 运行 在多个线程中并行化(释放 GIL)。 Pandas.read_csv 是一个例外,特别是如果您生成的数据帧使用对象数据类型作为文本
dask.dataframe.to_hdf(filename) 强制顺序计算
写入单个 HDF 文件将强制顺序计算(很难并行写入单个文件。)
编辑:新解决方案
今天我会避免使用 HDF 而改用 Parquet。我可能会使用多处理或 dask.distributed 调度程序来避免单台机器上的 GIL 问题。这两者的结合应该给你完整的线性缩放。
from dask.distributed import Client
client = Client()
df = dask.dataframe.read_csv(...)
df.to_parquet(...)
解决方案
因为您的数据集可能适合内存,所以使用 dask.dataframe.read_csv 与多个进程并行加载,然后立即切换到 Pandas.
import dask.dataframe as ddf
import dask.multiprocessing
df = ddf.read_csv("data/Measurements*.csv", # read in parallel
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df = df.compute(get=dask.multiprocessing.get) # convert to pandas
df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')
df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
借助@MRocklin 的回答,在较新版本的 dask 中,您可以使用 df.compute(scheduler='processes')
或 df.compute(scheduler='threads')
使用多处理或多线程转换为 pandas:
from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df = df.compute(scheduler='processes') # convert to pandas
df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')
df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
我正在将一个大型文本文件转换为一个 hdf 存储,以期获得更快的数据访问速度。转换工作正常,但是从 csv 文件读取不是并行完成的。它真的很慢(SSD 上一个 1GB 的文本文件大约需要 30 分钟,所以我猜它不是 IO-bound)。
有没有办法让它在多个线程中并行读取? 因为它可能很重要,我目前被迫 运行 在 Windows 下——以防万一有什么不同。
from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df.categorize([ 'Type',
'Condition',
])
df.to_hdf("data/data.hdf", "Measurements", 'w')
是的,dask.dataframe可以并行读取。但是,您 运行 遇到了两个问题:
Pandas.read_csv 仅部分释放 GIL
默认情况下 dask.dataframe 与线程并行化,因为大多数 Pandas 可以 运行 在多个线程中并行化(释放 GIL)。 Pandas.read_csv 是一个例外,特别是如果您生成的数据帧使用对象数据类型作为文本
dask.dataframe.to_hdf(filename) 强制顺序计算
写入单个 HDF 文件将强制顺序计算(很难并行写入单个文件。)
编辑:新解决方案
今天我会避免使用 HDF 而改用 Parquet。我可能会使用多处理或 dask.distributed 调度程序来避免单台机器上的 GIL 问题。这两者的结合应该给你完整的线性缩放。
from dask.distributed import Client
client = Client()
df = dask.dataframe.read_csv(...)
df.to_parquet(...)
解决方案
因为您的数据集可能适合内存,所以使用 dask.dataframe.read_csv 与多个进程并行加载,然后立即切换到 Pandas.
import dask.dataframe as ddf
import dask.multiprocessing
df = ddf.read_csv("data/Measurements*.csv", # read in parallel
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df = df.compute(get=dask.multiprocessing.get) # convert to pandas
df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')
df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
借助@MRocklin 的回答,在较新版本的 dask 中,您可以使用 df.compute(scheduler='processes')
或 df.compute(scheduler='threads')
使用多处理或多线程转换为 pandas:
from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df = df.compute(scheduler='processes') # convert to pandas
df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')
df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')