dask.map_partitions(..).compute() 预期结果大小为 2Gb,在 16Gb 本地机器上引发内存错误
dask.map_partitions(..).compute() with expected 2Gb result size raises Memory error on 16Gb local machine
运行 Dask 集群上的代码通过 map_partitions(...).compute()
预期结果只有 cca 2Gb 大小。
令人惊讶的是,它在我的本地机器上失败并显示 MemoryError
!虽然,我确实在为远程集群使用客户端,但远程集群的仪表板也显示集群正在忙于执行此任务,并且集群的内存消耗也达到了峰值(最初我认为集群的内存是问题所在).
我不知道怎么可能,内存问题发生在本地机器内存上?
预期结果大小为 cca 2Gb,而我的笔记本电脑有 16Gb。
dask_client.upload_file(os.path.join(src_folder,'capacity.py'))
result = (read_parquets_separately_by_dask_and_concatenate('hub_to_hub_capacity/2021/10/')
.map_partitions(capacity.capacity_features,
meta=meta_capacity_features,
transform_divisions=False)
).compute()
在 Dask 集群(部署在 Azure 中)上,我每个月都有一个每小时的文件。 Dask 独立读取每个文件并将它们连接起来。我的期望是“map_partitions”将处理每个文件的数据块。
同样的代码一天(24 个文件)运行 很快,结果返回 70Mb pandas df。
错误消息显示我的本地库路径。由于内存问题,我遇到了其他应用程序故障。所以,肯定是本地出问题了:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
...
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "C:\Users\oleg.demidenko\.virtualenvs\interc_predict-5QmpLbYY\lib\site-packages\msgpack\__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack\_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__
MemoryError: Unable to allocate internal buffer.
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
...
File "C:\Users\oleg.demidenko\.virtualenvs\interc_predict-5QmpLbYY\lib\site-packages\msgpack\__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack\_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__
MemoryError: Unable to allocate internal buffer.
为了让 Dask 在您的客户端中构建数据框作为 compute()
的结果,它需要
- 从 worker 下载一组编码为字节流的结果
- 解码到内存中 pandas 数据帧
- 调用
pd.concatenate
将其拼接成单个输出。
连接时,构成的数据帧必须全部在客户端内存中,并且传入的字节流可能尚未释放,因此内存峰值。已经完成了尝试使步骤零复制的工作,但我不知道这里的进展。 public pandas API 分配预期的数据帧并将数据直接写入其中很棘手(参见 fastparquet 确实这样做,代码复杂且容易出错!)。
真的,.compute()
是指一个非常聚合的最终结果。在所有关于内存压力的建议中,使用 pandas 数据帧(有或没有 dask)建议你应该总是有“几次到多次”(取决于进程)数据的内存大小可以自由使用能够顺利操作。
-编辑-
回溯实际上表明问题发生在工作人员打包数据帧以发送给客户端时;但同样的论点适用。
运行 Dask 集群上的代码通过 map_partitions(...).compute()
预期结果只有 cca 2Gb 大小。
令人惊讶的是,它在我的本地机器上失败并显示 MemoryError
!虽然,我确实在为远程集群使用客户端,但远程集群的仪表板也显示集群正在忙于执行此任务,并且集群的内存消耗也达到了峰值(最初我认为集群的内存是问题所在).
我不知道怎么可能,内存问题发生在本地机器内存上? 预期结果大小为 cca 2Gb,而我的笔记本电脑有 16Gb。
dask_client.upload_file(os.path.join(src_folder,'capacity.py'))
result = (read_parquets_separately_by_dask_and_concatenate('hub_to_hub_capacity/2021/10/')
.map_partitions(capacity.capacity_features,
meta=meta_capacity_features,
transform_divisions=False)
).compute()
在 Dask 集群(部署在 Azure 中)上,我每个月都有一个每小时的文件。 Dask 独立读取每个文件并将它们连接起来。我的期望是“map_partitions”将处理每个文件的数据块。 同样的代码一天(24 个文件)运行 很快,结果返回 70Mb pandas df。
错误消息显示我的本地库路径。由于内存问题,我遇到了其他应用程序故障。所以,肯定是本地出问题了:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
...
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "C:\Users\oleg.demidenko\.virtualenvs\interc_predict-5QmpLbYY\lib\site-packages\msgpack\__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack\_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__
MemoryError: Unable to allocate internal buffer.
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
...
File "C:\Users\oleg.demidenko\.virtualenvs\interc_predict-5QmpLbYY\lib\site-packages\msgpack\__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack\_packer.pyx", line 120, in msgpack._cmsgpack.Packer.__cinit__
MemoryError: Unable to allocate internal buffer.
为了让 Dask 在您的客户端中构建数据框作为 compute()
的结果,它需要
- 从 worker 下载一组编码为字节流的结果
- 解码到内存中 pandas 数据帧
- 调用
pd.concatenate
将其拼接成单个输出。
连接时,构成的数据帧必须全部在客户端内存中,并且传入的字节流可能尚未释放,因此内存峰值。已经完成了尝试使步骤零复制的工作,但我不知道这里的进展。 public pandas API 分配预期的数据帧并将数据直接写入其中很棘手(参见 fastparquet 确实这样做,代码复杂且容易出错!)。
真的,.compute()
是指一个非常聚合的最终结果。在所有关于内存压力的建议中,使用 pandas 数据帧(有或没有 dask)建议你应该总是有“几次到多次”(取决于进程)数据的内存大小可以自由使用能够顺利操作。
-编辑-
回溯实际上表明问题发生在工作人员打包数据帧以发送给客户端时;但同样的论点适用。