从 parquet 读取时,dask dataframe 列重命名很慢(呃)
dask dataframe column renames are slow(er) when read from parquet
我发现每当从 parquet 文件读取数据帧时,dask.dataframe.rename
都会显着增加计算时间:
In [1]: import dask.dataframe as dd
...: df = dd.demo.daily_stock('GOOG', '2008', '2010', freq='1s', random_state=1234)
...:
In [2]: %time df.close.mean().compute()
CPU times: user 7.73 s, sys: 1.15 s, total: 8.88 s
Wall time: 3.5 s
Out[2]: 452.30345234893554
In [3]: %time df = df.rename(columns={col: col.upper() for col in df.columns}); df.CLOSE.mean().compute()
CPU times: user 8.06 s, sys: 1.21 s, total: 9.27 s
Wall time: 3.81 s
In [4]: df.to_parquet('df', compression='GZIP')
...: df = dd.read_parquet('df')
...:
In [5]: %time df.CLOSE.mean().compute()
CPU times: user 4.14 s, sys: 729 ms, total: 4.87 s
Wall time: 2.1 s
Out[5]: 452.30345234893554
In [6]: %time df = df.rename(columns={col: col.lower() for col in df.columns}); df.close.mean().compute()
CPU times: user 9.72 s, sys: 1.89 s, total: 11.6 s
Wall time: 4.81 s
请注意,原始数据帧上的差异很小,但基于镶木地板的数据帧上的差异超过两倍。
这个问题在大型数据集 (~20-30GB) 上被夸大了,我看到 mean
计算从几秒到几分钟。
这是我不知道的 parquet 文件固有的东西,还是某种错误?
这可能是因为重命名方法作用于数据帧的每个分区,我认为它的开销相当于 dd.rename
考虑一下:
In [45]: %time (dd.demo.daily_stock('GOOG', '2008', '2010', freq='1s',
random_state=1234).repartition(npartitions=1).rename(columns = {col:
col.upper() for col in df.columns}).CLOSE.mean().compute())
CPU times: user 11.7 s, sys: 4.65 s, total: 16.3 s
Wall time: 9.23 s
Out[45]: 450.46079905299979
In [46]: %time (dd.demo.daily_stock('GOOG', '2008', '2010', freq='1s',
random_state=1234).repartition(npartitions=1).close.mean().compute())
CPU times: user 11.3 s, sys: 4.63 s, total: 15.9 s
Wall time: 8.8 s
Out[46]: 450.46079905299979
当分区设置为 1 时,重命名开销似乎不像您的示例中那样明显。
更新 1:添加 Parquet 示例
In [103]: data =dd.read_parquet('df').repartition(npartitions=1).rename(columns = {'close':'ClOSE', 'high ':'HIGH', 'low':'LOW', 'open':'OPEN'})
In [104]: %time data.ClOSE.mean().compute()
CPU times: user 9.68 s, sys: 2.84 s, total: 12.5 s
Wall time: 5.72 s
Out[104]: 450.46079905299979
In [105]: data = dd.read_parquet('df').repartition(npartitions=1)
In [106]: %time data.close.mean().compute()
CPU times: user 9.37 s, sys: 2.56 s, total: 11.9 s
Wall time: 5.1 s
Out[106]: 450.46079905299979
更新 2:显式添加列
根据上面 Matt 的回答,避免读取 Parquet 文件的所有列如下所示:
%time dd.read_parquet('df',columns =['close']).rename(columns = {'close':'CLOSE'}).CLOSE.mean().com
...: pute()
CPU times: user 4.65 s, sys: 801 ms, total: 5.45 s
Wall time: 2.71 s
类似于:
%time dd.read_parquet('df',columns =['close']).close.mean().compute()
CPU times: user 4.46 s, sys: 795 ms, total: 5.25 s
Wall time: 2.51 s
Out[110]: 450.46079905300002
旁白:重命名+任务调度在我机器上的单个数据分区上有~40ms
的开销:
In [114]: %timeit -n 3 dd.read_parquet('df',columns =['close']).repartition(npartitions=1).rename(columns = {
...: 'close': 'CLOSE'}).CLOSE.mean().compute()
3 loops, best of 3: 2.36 s per loop
In [115]: %timeit -n 3 dd.read_parquet('df',columns =['close']).repartition(npartitions=1).close.mean().compu
...: te()
3 loops, best of 3: 2.32 s per loop
应用于大约 500 个分区,大约 20 秒。以防万一,这种东西以后有用。
Parquet 是列式存储。从 parquet 文件中读取单个列比读取整个数据集要快得多。当你做 df.close.mean().compute()
时,Dask 注意到你有一个 read_parquet
操作,紧接着是一个列访问操作,它可以智能地将它们融合到更智能的东西,如下所示:
df = dd.read_parquet(filename, columns=['close'])
但是,当您在 read_parquet
调用和列访问操作之间抛出 rename
操作时,Dask.dataframe 不够智能,无法意识到它可以交换列访问和重命名,因此您最终会从 parquet 文件中读取所有数据,重命名列,然后丢弃除一列之外的所有列。
缺乏对计算进行高级推理的能力正是数据库或更高级的系统(如 Spark Dataframes)开始胜过 Dask.dataframe 的地方。 Dask 的核心通常是较低级别的,因此可以进行更疯狂的计算,但除了最基本的查询优化外,它失去了执行任何操作的能力。
所以在这种情况下,并不是 rename
正在减慢速度,而是 rename
在一个非常简单的优化方案中投入了扳手。
我发现每当从 parquet 文件读取数据帧时,dask.dataframe.rename
都会显着增加计算时间:
In [1]: import dask.dataframe as dd
...: df = dd.demo.daily_stock('GOOG', '2008', '2010', freq='1s', random_state=1234)
...:
In [2]: %time df.close.mean().compute()
CPU times: user 7.73 s, sys: 1.15 s, total: 8.88 s
Wall time: 3.5 s
Out[2]: 452.30345234893554
In [3]: %time df = df.rename(columns={col: col.upper() for col in df.columns}); df.CLOSE.mean().compute()
CPU times: user 8.06 s, sys: 1.21 s, total: 9.27 s
Wall time: 3.81 s
In [4]: df.to_parquet('df', compression='GZIP')
...: df = dd.read_parquet('df')
...:
In [5]: %time df.CLOSE.mean().compute()
CPU times: user 4.14 s, sys: 729 ms, total: 4.87 s
Wall time: 2.1 s
Out[5]: 452.30345234893554
In [6]: %time df = df.rename(columns={col: col.lower() for col in df.columns}); df.close.mean().compute()
CPU times: user 9.72 s, sys: 1.89 s, total: 11.6 s
Wall time: 4.81 s
请注意,原始数据帧上的差异很小,但基于镶木地板的数据帧上的差异超过两倍。
这个问题在大型数据集 (~20-30GB) 上被夸大了,我看到 mean
计算从几秒到几分钟。
这是我不知道的 parquet 文件固有的东西,还是某种错误?
这可能是因为重命名方法作用于数据帧的每个分区,我认为它的开销相当于 dd.rename
考虑一下:
In [45]: %time (dd.demo.daily_stock('GOOG', '2008', '2010', freq='1s',
random_state=1234).repartition(npartitions=1).rename(columns = {col:
col.upper() for col in df.columns}).CLOSE.mean().compute())
CPU times: user 11.7 s, sys: 4.65 s, total: 16.3 s
Wall time: 9.23 s
Out[45]: 450.46079905299979
In [46]: %time (dd.demo.daily_stock('GOOG', '2008', '2010', freq='1s',
random_state=1234).repartition(npartitions=1).close.mean().compute())
CPU times: user 11.3 s, sys: 4.63 s, total: 15.9 s
Wall time: 8.8 s
Out[46]: 450.46079905299979
当分区设置为 1 时,重命名开销似乎不像您的示例中那样明显。
更新 1:添加 Parquet 示例
In [103]: data =dd.read_parquet('df').repartition(npartitions=1).rename(columns = {'close':'ClOSE', 'high ':'HIGH', 'low':'LOW', 'open':'OPEN'})
In [104]: %time data.ClOSE.mean().compute()
CPU times: user 9.68 s, sys: 2.84 s, total: 12.5 s
Wall time: 5.72 s
Out[104]: 450.46079905299979
In [105]: data = dd.read_parquet('df').repartition(npartitions=1)
In [106]: %time data.close.mean().compute()
CPU times: user 9.37 s, sys: 2.56 s, total: 11.9 s
Wall time: 5.1 s
Out[106]: 450.46079905299979
更新 2:显式添加列
根据上面 Matt 的回答,避免读取 Parquet 文件的所有列如下所示:
%time dd.read_parquet('df',columns =['close']).rename(columns = {'close':'CLOSE'}).CLOSE.mean().com
...: pute()
CPU times: user 4.65 s, sys: 801 ms, total: 5.45 s
Wall time: 2.71 s
类似于:
%time dd.read_parquet('df',columns =['close']).close.mean().compute()
CPU times: user 4.46 s, sys: 795 ms, total: 5.25 s
Wall time: 2.51 s
Out[110]: 450.46079905300002
旁白:重命名+任务调度在我机器上的单个数据分区上有~40ms
的开销:
In [114]: %timeit -n 3 dd.read_parquet('df',columns =['close']).repartition(npartitions=1).rename(columns = {
...: 'close': 'CLOSE'}).CLOSE.mean().compute()
3 loops, best of 3: 2.36 s per loop
In [115]: %timeit -n 3 dd.read_parquet('df',columns =['close']).repartition(npartitions=1).close.mean().compu
...: te()
3 loops, best of 3: 2.32 s per loop
应用于大约 500 个分区,大约 20 秒。以防万一,这种东西以后有用。
Parquet 是列式存储。从 parquet 文件中读取单个列比读取整个数据集要快得多。当你做 df.close.mean().compute()
时,Dask 注意到你有一个 read_parquet
操作,紧接着是一个列访问操作,它可以智能地将它们融合到更智能的东西,如下所示:
df = dd.read_parquet(filename, columns=['close'])
但是,当您在 read_parquet
调用和列访问操作之间抛出 rename
操作时,Dask.dataframe 不够智能,无法意识到它可以交换列访问和重命名,因此您最终会从 parquet 文件中读取所有数据,重命名列,然后丢弃除一列之外的所有列。
缺乏对计算进行高级推理的能力正是数据库或更高级的系统(如 Spark Dataframes)开始胜过 Dask.dataframe 的地方。 Dask 的核心通常是较低级别的,因此可以进行更疯狂的计算,但除了最基本的查询优化外,它失去了执行任何操作的能力。
所以在这种情况下,并不是 rename
正在减慢速度,而是 rename
在一个非常简单的优化方案中投入了扳手。