即使是简单的计算,Dask 内存使用量也会爆炸式增长

Dask memory usage exploding even for simple computations

我有一个用 dask 创建的 parquet 文件夹,其中包含多个文件,每个文件大约 100MB。当我使用 df = dask.dataframe.read_parquet(path_to_parquet_folder) 和 运行 任何类型的计算(例如 df.describe().compute())加载数据帧时,我的内核崩溃了。

我注意到的事情:

编辑:

我试图创建一个可重现的示例,但没有成功,但我发现了一些其他的奇怪之处,似乎都与我正在使用的更新的 pandas dtypes 有关:

import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd

# generate dataframe with 3 different nullable dtypes and n rows
n = 10000000
test = pd.DataFrame({
    1:pd.Series(['a', pd.NA]*n, dtype = pd.StringDtype()), 
    2:pd.Series([1, pd.NA]*n, dtype = pd.Int64Dtype()),
    3:pd.Series([0.56, pd.NA]*n, dtype = pd.Float64Dtype())
})

dd_df = dd.from_pandas(test, npartitions = 2) # convert to dask df

dd_df.to_parquet('test.parquet') # save as parquet directory

dd_df = dd.read_parquet('test.parquet') # load files back

dd_df.mean().compute() # compute something
dd_df.describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something

输出分别为:

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

Kernel appears to have died.

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

似乎 dtypes 甚至在整个 parquet IO 中都得到了保留,但是 dask 实际上对这些列做任何事情都有一些麻烦。

Python版本:3.9.7 粗略版本:2021.11.2

似乎主要错误是由于 NAType 引起的 numpy(版本 1.21.4)尚未完全支持:

~/some_env/python3.8/site-packages/numpy/core/_methods.py in _var(a, axis, dtype, out, ddof, keepdims, where)
    240     # numbers and complex types with non-native byteorder
    241     else:
--> 242         x = um.multiply(x, um.conjugate(x), out=x).real
    243 
    244     ret = umr_sum(x, axis, dtype, out, keepdims=keepdims, where=where)

TypeError: loop of ufunc does not support argument 0 of type NAType which has no callable conjugate method

作为解决方法,将列转换为浮动将计算描述。请注意,为避免 KeyError 列名称以字符串形式给出,而不是 int.

import pandas as pd
from dask.diagnostics import ProgressBar

ProgressBar().register()
from dask.diagnostics import ResourceProfiler

rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd

# generate dataframe with 3 different nullable dtypes and n rows
n = 1000

# note that column names are changed to strings rather than ints
test = pd.DataFrame(
    {
        "1": pd.Series(["a", pd.NA] * n, dtype=pd.StringDtype()),
        "2": pd.Series([1, pd.NA] * n, dtype=pd.Int64Dtype()),
        "3": pd.Series([0.56, pd.NA] * n, dtype=pd.Float64Dtype()),
    }
)

dd_df = dd.from_pandas(test, npartitions=2)  # convert to dask df

dd_df.to_parquet("test.parquet", engine="fastparquet")  # save as parquet directory

dd_df = dd.read_parquet("test.parquet", engine="fastparquet")  # load files back

dd_df.mean().compute()  # compute something
dd_df.astype({"2": "float"}).describe().compute()  # compute something
dd_df.count().compute()  # compute something
dd_df.max().compute()  # compute something