dask.map_partitions(..).compute() 预期结果大小为 2Gb,在 16Gb 本地机器上引发内存错误

dask.map_partitions(..).compute() with expected 2Gb result size raises Memory error on 16Gb local machine

运行 Dask 集群上的代码通过 map_partitions(...).compute() 预期结果只有 cca​​ 2Gb 大小。

令人惊讶的是,它在我的本地机器上失败并显示 MemoryError!虽然,我确实在为远程集群使用客户端,但远程集群的仪表板也显示集群正在忙于执行此任务,并且集群的内存消耗也达到了峰值(最初我认为集群的内存是问题所在).

我不知道怎么可能,内存问题发生在本地机器内存上? 预期结果大小为 cca 2Gb,而我的笔记本电脑有 16Gb。

dask_client.upload_file(os.path.join(src_folder,'capacity.py'))
result = (read_parquets_separately_by_dask_and_concatenate('hub_to_hub_capacity/2021/10/')
    .map_partitions(capacity.capacity_features,
                    meta=meta_capacity_features,
                    transform_divisions=False)
).compute()

在 Dask 集群(部署在 Azure 中)上,我每个月都有一个每小时的文件。 Dask 独立读取每个文件并将它们连接起来。我的期望是“map_partitions”将处理每个文件的数据块。 同样的代码一天(24 个文件)运行 很快,结果返回 70Mb pandas df。

错误消息显示我的本地库路径。由于内存问题,我遇到了其他应用程序故障。所以,肯定是本地出问题了:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
...
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "C:\Users\oleg.demidenko\.virtualenvs\interc_predict-5QmpLbYY\lib\site-packages\msgpack\__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack\_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__
MemoryError: Unable to allocate internal buffer.
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
 ...
  File "C:\Users\oleg.demidenko\.virtualenvs\interc_predict-5QmpLbYY\lib\site-packages\msgpack\__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack\_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__
MemoryError: Unable to allocate internal buffer.

为了让 Dask 在您的客户端中构建数据框作为 compute() 的结果,它需要

  • 从 worker 下载一组编码为字节流的结果
  • 解码到内存中 pandas 数据帧
  • 调用 pd.concatenate 将其拼接成单个输出。

连接时,构成的数据帧必须全部在客户端内存中,并且传入的字节流可能尚未释放,因此内存峰值。已经完成了尝试使步骤零复制的工作,但我不知道这里的进展。 public pandas API 分配预期的数据帧并将数据直接写入其中很棘手(参见 fastparquet 确实这样做,代码复杂且容易出错!)。

真的,.compute() 是指一个非常聚合的最终结果。在所有关于内存压力的建议中,使用 pandas 数据帧(有或没有 dask)建议你应该总是有“几次到多次”(取决于进程)数据的内存大小可以自由使用能够顺利操作。

-编辑-

回溯实际上表明问题发生在工作人员打包数据帧以发送给客户端时;但同样的论点适用。