使用从 parquet 文件创建的 dask 数据帧时内存使用过多
Excessive memory usage when using dask dataframe created from parquet file
我有 800K 行 x 8.7K 列的镶木地板文件。我将它加载到一个 dask 数据框中:
import dask.dataframe as dd
dask_train_df = dd.read_parquet('train.parquet')
dask_train_df.info()
这产生:
<class 'dask.dataframe.core.DataFrame'>
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)
当我尝试执行像 dask_train_df.head()
或 dask_train_df.loc[2:4].compute()
这样的简单操作时,我会遇到内存错误,即使是 17+ GB 的 RAM。
但是,如果我这样做:
import pandas as pd
train = pd.read_parquet('../input/train.parquet')
train.info()
产量:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 800000 entries, 0 to 799999
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)
memory usage: 6.5 GB
我可以 运行 train.head()
和 train.loc[2:4]
没有问题,因为一切都已经在内存中了。
1) 所以我的问题是,为什么这些简单的操作使用 Dask Dataframe 会增加内存使用量,但是当我使用 Pandas Dataframe 将所有内容加载到内存时却能正常工作?
我注意到 npartitions=1
,并且我在文档中看到 read_parquet
"reads a directory of Parquet data into a Dask.dataframe, one file per partition"。就我而言,听起来我正在失去拥有多个分区的所有并行化能力,但是 Dask Dataframe 内存使用量不应该受到单个 Pandas Dataframe 的内存量的限制吗?
2) 另外,还有一个附带问题:如果我想通过在 Dask Dataframe 中分区来并行化这个单个镶木地板文件,我该怎么做?我在 dd.read_parquet
签名中没有看到 blocksize 参数。我也尝试过使用重新分区功能,但我相信沿着行分区和在镶木地板文件中,我想沿着列分区?
首先,我想评论一下,8712 列相当多,您会发现解析 schema/metadata 可能会花费大量时间,更不用说数据加载了。
当 fastparquet 加载数据时,它首先分配一个足够大的数据帧,然后遍历 columns/chunks(具有适当的开销,在这种情况下显然很小)并将值分配到分配的数据帧中。
当您通过 Dask 运行 进行计算(任何计算)时,在许多情况下,输入变量和其他中间对象的内存中可能存在任务内副本。这通常不是问题,因为整个数据集应该被分成许多部分,而小的中间体的内存开销是能够处理大于内存的数据集的值得付出的代价。我不确定您是在哪一点获得副本,这可能值得调查和预防。
在您的例子中,整个数据集是一个分区。这将导致单个加载任务 运行ning 在一个线程中。您不会获得任何并行性,并且任何中间内部副本都适用于整个数据集。您 可以 通过选择列只加载部分数据,然后制造分区并以这种方式实现并行性。但是,处理 parquet 数据的典型方法是使用 "row-group" 分区(即沿着索引)和多个文件,因此避免该问题的真正方法是使用已经适当分区的数据。
请注意,由于您可以使用 fastparquet/pandas 直接加载数据,因此您也可以使用 to_parquet
方法或 fastparquet 的 write 函数保存分区版本。
我有 800K 行 x 8.7K 列的镶木地板文件。我将它加载到一个 dask 数据框中:
import dask.dataframe as dd
dask_train_df = dd.read_parquet('train.parquet')
dask_train_df.info()
这产生:
<class 'dask.dataframe.core.DataFrame'>
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)
当我尝试执行像 dask_train_df.head()
或 dask_train_df.loc[2:4].compute()
这样的简单操作时,我会遇到内存错误,即使是 17+ GB 的 RAM。
但是,如果我这样做:
import pandas as pd
train = pd.read_parquet('../input/train.parquet')
train.info()
产量:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 800000 entries, 0 to 799999
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)
memory usage: 6.5 GB
我可以 运行 train.head()
和 train.loc[2:4]
没有问题,因为一切都已经在内存中了。
1) 所以我的问题是,为什么这些简单的操作使用 Dask Dataframe 会增加内存使用量,但是当我使用 Pandas Dataframe 将所有内容加载到内存时却能正常工作?
我注意到 npartitions=1
,并且我在文档中看到 read_parquet
"reads a directory of Parquet data into a Dask.dataframe, one file per partition"。就我而言,听起来我正在失去拥有多个分区的所有并行化能力,但是 Dask Dataframe 内存使用量不应该受到单个 Pandas Dataframe 的内存量的限制吗?
2) 另外,还有一个附带问题:如果我想通过在 Dask Dataframe 中分区来并行化这个单个镶木地板文件,我该怎么做?我在 dd.read_parquet
签名中没有看到 blocksize 参数。我也尝试过使用重新分区功能,但我相信沿着行分区和在镶木地板文件中,我想沿着列分区?
首先,我想评论一下,8712 列相当多,您会发现解析 schema/metadata 可能会花费大量时间,更不用说数据加载了。
当 fastparquet 加载数据时,它首先分配一个足够大的数据帧,然后遍历 columns/chunks(具有适当的开销,在这种情况下显然很小)并将值分配到分配的数据帧中。
当您通过 Dask 运行 进行计算(任何计算)时,在许多情况下,输入变量和其他中间对象的内存中可能存在任务内副本。这通常不是问题,因为整个数据集应该被分成许多部分,而小的中间体的内存开销是能够处理大于内存的数据集的值得付出的代价。我不确定您是在哪一点获得副本,这可能值得调查和预防。
在您的例子中,整个数据集是一个分区。这将导致单个加载任务 运行ning 在一个线程中。您不会获得任何并行性,并且任何中间内部副本都适用于整个数据集。您 可以 通过选择列只加载部分数据,然后制造分区并以这种方式实现并行性。但是,处理 parquet 数据的典型方法是使用 "row-group" 分区(即沿着索引)和多个文件,因此避免该问题的真正方法是使用已经适当分区的数据。
请注意,由于您可以使用 fastparquet/pandas 直接加载数据,因此您也可以使用 to_parquet
方法或 fastparquet 的 write 函数保存分区版本。