Dask:过滤期间 运行 内存不足 (MRE)

Dask: Running out of memory during filtering (MRE)

tl;博士

我想根据列的值过滤 Dask 数据框,即

data.loc[data[column].lt(value)].to_parquet(path)

但是我这样做 运行 内存不足,尽管每个分区都比可用内存小 20 倍。

示例数据

让我们先创建一些示例数据来使用

import numpy as np
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])
ddf = dd.from_pandas(df, npartitions=800)  # each partition has about 10Mb in memory
ddf.to_parquet('sample.parq')

解决方案尝试

假设我的机器只有 512Mb 内存。当然,我的问题规模要大得多(在我的例子中是 TB),但这个简单的问题似乎捕获了我在更大数据集上遇到的同样问题。

因此,我将使用两名工人,每个工人 200Mb

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=2, memory_limit='200Mb')
client = Client(cluster)

data = dd.read_parquet('sample.parq')
task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)

由于每个分区占用 10Mb 的内存,根据 Wes Kinney 的 rule of thumb,我可能最多需要 50-100Mb 的内存来处理一个分区,所以 200Mb 应该绰绰有余。

然而,当我 运行 task.compute() 工作人员几乎立即 运行 内存不足时,重新启动并随后完全被杀死。

我尝试过的东西

限制资源

我也试过限制worker resources。据我了解,这应该让工作人员知道它一次只能处理一项任务。也许这太保守了,但在那种情况下,我预计会发生死锁,而不是 运行 内存不足。

cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})
task.compute(resources={'m': 200})

然而,遗憾的是结果是一样的

分析读取 parquet 的内存使用情况

SultanOrazbayev 建议我应该使用 memory_profiler 来查看加载单个分区时使用了多少内存,因为 read_parquet 的使用可能是这里的罪魁祸首。

我写了test-load.py

import pandas as pd

@profile
def load_parq():
    return pd.read_parquet('sample.parq/part.0.parquet')

if __name__ == '__main__':
    df = load_parq()
    print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')

和运行它使用python3 -m memory_profiler test-load.py。这是输出:

Memory footprint: 10.0MB
Filename: test-load.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     3   74.750 MiB   74.750 MiB           1   @profile
     4                                         def load_parq():
     5  132.992 MiB   58.242 MiB           1       return pd.read_parquet('sample.parq/part.0.parquet')

而且——很公平——即使读取单个文件也需要比我想象的更多的内存。 200MB 可能不够,但多少才够?

根据我的设置,两个工人中的每个人的答案大约是 4GB。这实际上等于整个数据集。事实上,查看仪表板,dask 似乎很乐意同时加载数十个分区。如果它有 4GB 的内存那很好,但是如果它没有那么多我该如何继续?

问题出现在处理的早期 - 在读取数据期间。如果您在 Jupyter Lab 中使用 memory profiler(对于 Python 脚本使用 pip install memory_profiler),那么您将看到简单地使用 pandas 加载文件使用的内存是文件大小。在我使用 csv 和 parquet 文件的实验中,内存倍增器大约是底层文件大小的 3 到 10 倍(我使用的是 pandas version 1.2.3)。

Google 显示 pd.read_csvpd.read_parquet 的高内存使用率是一个反复出现的问题...因此,除非您能找到一种内存高效的加载数据的方式,否则工作人员必须给定更多内存(或文件大小方面的负载更小)。请注意,这是在任何 dask 操作之前出现的问题,因此超出了 resources 选项的控制范围。

我想我已经找到了本案的罪魁祸首。问题是 dask 自动尝试使用尽可能多的可用内核。

from dask.distributed import Client
with Client(n_workers=2, memory_limit='300Mb') as client:
    print(client)

生产

<Client: 'tcp://127.0.0.1:39337' processes=2 threads=48, memory=600.00 MB>

当我因此尝试读取 parquet 文件时,dask 使用了所有 48 个可用内核,立即 运行 内存不足。

这里的技巧是限制每个工作线程的数量:

with Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) as client:
    print(client)

产生

<Client: 'tcp://127.0.0.1:34421' processes=2 threads=2, memory=600.00 MB>

然后计算继续进行,没有任何问题,每个工作人员在任何时间点使用大约 200-250MB。

相关问题

  • Difference between dask.distributed LocalCluster with threads vs. processes