即使是简单的计算,Dask 内存使用量也会爆炸式增长
Dask memory usage exploding even for simple computations
我有一个用 dask
创建的 parquet
文件夹,其中包含多个文件,每个文件大约 100MB。当我使用 df = dask.dataframe.read_parquet(path_to_parquet_folder)
和 运行 任何类型的计算(例如 df.describe().compute()
)加载数据帧时,我的内核崩溃了。
我注意到的事情:
- CPU usage (100%左右)表示没有使用多线程
- 内存使用量远远超过单个文件的大小
- 系统内存使用率接近 100% 后内核崩溃
编辑:
我试图创建一个可重现的示例,但没有成功,但我发现了一些其他的奇怪之处,似乎都与我正在使用的更新的 pandas
dtypes 有关:
import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd
# generate dataframe with 3 different nullable dtypes and n rows
n = 10000000
test = pd.DataFrame({
1:pd.Series(['a', pd.NA]*n, dtype = pd.StringDtype()),
2:pd.Series([1, pd.NA]*n, dtype = pd.Int64Dtype()),
3:pd.Series([0.56, pd.NA]*n, dtype = pd.Float64Dtype())
})
dd_df = dd.from_pandas(test, npartitions = 2) # convert to dask df
dd_df.to_parquet('test.parquet') # save as parquet directory
dd_df = dd.read_parquet('test.parquet') # load files back
dd_df.mean().compute() # compute something
dd_df.describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something
输出分别为:
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
Kernel appears to have died.
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
似乎 dtypes 甚至在整个 parquet IO 中都得到了保留,但是 dask 实际上对这些列做任何事情都有一些麻烦。
Python版本:3.9.7
粗略版本:2021.11.2
似乎主要错误是由于 NAType
引起的 numpy
(版本 1.21.4
)尚未完全支持:
~/some_env/python3.8/site-packages/numpy/core/_methods.py in _var(a, axis, dtype, out, ddof, keepdims, where)
240 # numbers and complex types with non-native byteorder
241 else:
--> 242 x = um.multiply(x, um.conjugate(x), out=x).real
243
244 ret = umr_sum(x, axis, dtype, out, keepdims=keepdims, where=where)
TypeError: loop of ufunc does not support argument 0 of type NAType which has no callable conjugate method
作为解决方法,将列转换为浮动将计算描述。请注意,为避免 KeyError
列名称以字符串形式给出,而不是 int
.
import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd
# generate dataframe with 3 different nullable dtypes and n rows
n = 1000
# note that column names are changed to strings rather than ints
test = pd.DataFrame(
{
"1": pd.Series(["a", pd.NA] * n, dtype=pd.StringDtype()),
"2": pd.Series([1, pd.NA] * n, dtype=pd.Int64Dtype()),
"3": pd.Series([0.56, pd.NA] * n, dtype=pd.Float64Dtype()),
}
)
dd_df = dd.from_pandas(test, npartitions=2) # convert to dask df
dd_df.to_parquet("test.parquet", engine="fastparquet") # save as parquet directory
dd_df = dd.read_parquet("test.parquet", engine="fastparquet") # load files back
dd_df.mean().compute() # compute something
dd_df.astype({"2": "float"}).describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something
我有一个用 dask
创建的 parquet
文件夹,其中包含多个文件,每个文件大约 100MB。当我使用 df = dask.dataframe.read_parquet(path_to_parquet_folder)
和 运行 任何类型的计算(例如 df.describe().compute()
)加载数据帧时,我的内核崩溃了。
我注意到的事情:
- CPU usage (100%左右)表示没有使用多线程
- 内存使用量远远超过单个文件的大小
- 系统内存使用率接近 100% 后内核崩溃
编辑:
我试图创建一个可重现的示例,但没有成功,但我发现了一些其他的奇怪之处,似乎都与我正在使用的更新的 pandas
dtypes 有关:
import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd
# generate dataframe with 3 different nullable dtypes and n rows
n = 10000000
test = pd.DataFrame({
1:pd.Series(['a', pd.NA]*n, dtype = pd.StringDtype()),
2:pd.Series([1, pd.NA]*n, dtype = pd.Int64Dtype()),
3:pd.Series([0.56, pd.NA]*n, dtype = pd.Float64Dtype())
})
dd_df = dd.from_pandas(test, npartitions = 2) # convert to dask df
dd_df.to_parquet('test.parquet') # save as parquet directory
dd_df = dd.read_parquet('test.parquet') # load files back
dd_df.mean().compute() # compute something
dd_df.describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something
输出分别为:
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
Kernel appears to have died.
KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"
似乎 dtypes 甚至在整个 parquet IO 中都得到了保留,但是 dask 实际上对这些列做任何事情都有一些麻烦。
Python版本:3.9.7
粗略版本:2021.11.2
似乎主要错误是由于 NAType
引起的 numpy
(版本 1.21.4
)尚未完全支持:
~/some_env/python3.8/site-packages/numpy/core/_methods.py in _var(a, axis, dtype, out, ddof, keepdims, where)
240 # numbers and complex types with non-native byteorder
241 else:
--> 242 x = um.multiply(x, um.conjugate(x), out=x).real
243
244 ret = umr_sum(x, axis, dtype, out, keepdims=keepdims, where=where)
TypeError: loop of ufunc does not support argument 0 of type NAType which has no callable conjugate method
作为解决方法,将列转换为浮动将计算描述。请注意,为避免 KeyError
列名称以字符串形式给出,而不是 int
.
import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd
# generate dataframe with 3 different nullable dtypes and n rows
n = 1000
# note that column names are changed to strings rather than ints
test = pd.DataFrame(
{
"1": pd.Series(["a", pd.NA] * n, dtype=pd.StringDtype()),
"2": pd.Series([1, pd.NA] * n, dtype=pd.Int64Dtype()),
"3": pd.Series([0.56, pd.NA] * n, dtype=pd.Float64Dtype()),
}
)
dd_df = dd.from_pandas(test, npartitions=2) # convert to dask df
dd_df.to_parquet("test.parquet", engine="fastparquet") # save as parquet directory
dd_df = dd.read_parquet("test.parquet", engine="fastparquet") # load files back
dd_df.mean().compute() # compute something
dd_df.astype({"2": "float"}).describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something