保存到文件时 Dask 崩溃?

Dask crashing when saving to file?

我正在尝试对数据集进行 onehot 编码,然后按特定列进行分组,这样我就可以为该列中的每个项目获取一行,并汇总查看哪些 onehot 列对于该特定行是正确的。它似乎适用于小数据,而使用 dask 似乎适用于大型数据集,但我在尝试保存文件时遇到了问题。我试过 CSV 和镶木地板文件。我想保存结果,稍后我可以分块打开它。

这是显示问题的代码(下面的脚本生成 200 万行和多达 30k 的唯一值以进行 onehot 编码)。

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait

sizeOfRows = 2000000
columnsForDF = 30000 
partitionsforDask = 500 
print("partition is ", partitionsforDask)


cluster = LocalCluster()
client = Client(cluster)
print(client)



df = pd.DataFrame(np.random.randint(0,columnsForDF,size=(sizeOfRows, 2)), columns=list('AB'))
ddf = dd.from_pandas(df, npartitions=partitionsforDask)
# ddf = ddf.persist()
wait(ddf)

# %%time
# need to globally know the categories before one hot encoding
ddf = ddf.categorize(columns=["B"])
one_hot = dd.get_dummies(ddf, columns=['B'])
print("starting groupby")
# result = one_hot.groupby('A').max().persist() # or to_parquet/to_csv/compute/etc.
# result = one_hot.groupby('A', sort=False).max().to_csv('./daskDF.csv', single_file = True)
result = one_hot.groupby('A', sort=False).max().to_parquet('./parquetFile')
wait(result)

在对 csv 或 parquet 进行 groupby 之前,它似乎一直有效。那时,我收到很多关于工作人员超过 95% 内存的错误,然后程序退出并出现“killedworker”异常:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
KilledWorker: ("('dataframe-groupby-max-combine-3ddcd8fc854613101b4bdc7fccde32cd', 1, 0, 0)", <Worker 'tcp://127.0.0.1:33815', name: 6, memory: 0, processing: 22>)

监控我的机器,我从来没有接近超过内存,我的驱动器 space 超过 300 GB,从未使用过(在此过程中没有创建文件,尽管它在 groupby 部分)。

我能做什么?

更新 - 我想我会添加一个奖项。我也遇到了与 .to_csv 相同的问题,因为其他人也遇到了类似的问题,我希望它对广大受众有价值。

让我们首先考虑最终结果:它将是一个具有 30'000 列和 30'000 行的数据框。该对象将占用大约 6.7 GB 的内存。 (可以尝试使用 dtype 来减少内存占用,而且并非所有组合都可能出现在数据中,但为简单起见,我们忽略这些要点)

现在,假设我们只有两个分区,每个分区都包含所有可能的虚拟变量组合。这意味着每个工作人员至少需要 6.7 GB 来存储 .groupby().max() 对象,但最后一步将需要 13.4 GB,因为最终工作人员需要找到这两个对象的 .max。自然地,如果你有更多的分区,对最终 worker 的内存需求将会增加。在 dask 中有一种方法可以通过在相关函数中指定 split_every 来控制它。例如,如果您指定.max(split_every=2),那么任何一个工人将最多收到2个对象(split_every的默认值为8)。

在处理 500 个分区的初期,每个分区很可能只包含可能的虚拟值的一个子集,因此内存要求很低。然而,随着 dask 计算最终结果的进行,它会将具有不同虚拟值组合的对象组合在一起,因此内存需求将在管道末端增加。

原则上,您也可以使用resources来限制一个工人一次可以承担多少任务,但如果工人没有足够的内存来处理这些任务,那将无济于事。

解决这个问题的潜在方法是什么?至少有几个选项:

  • 使用拥有更多资源的工人;

  • 简化任务(例如,根据可能类别的子集将任务分成几个子任务);

  • 使用 delayed/futures 开发自定义工作流,它将对数据进行排序并实施自定义优先级,确保工作人员在进行最终聚合之前完成一部分工作.

如果工作内存是一个约束,那么子集必须非常细粒度。例如,在极限情况下,仅子集化为一个可能的虚拟变量组合将具有非常低的内存需求(初始数据加载和过滤器仍将需要足够的内存来容纳一个分区),但这当然是一个极端的例子,会产生数十个数千个任务,因此建议使用更大的类别组(平衡任务数量和内存需求)。要查看示例,您可以查看相关的 .