Dask:过滤期间 运行 内存不足 (MRE)
Dask: Running out of memory during filtering (MRE)
tl;博士
我想根据列的值过滤 Dask 数据框,即
data.loc[data[column].lt(value)].to_parquet(path)
但是我这样做 运行 内存不足,尽管每个分区都比可用内存小 20 倍。
示例数据
让我们先创建一些示例数据来使用
import numpy as np
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])
ddf = dd.from_pandas(df, npartitions=800) # each partition has about 10Mb in memory
ddf.to_parquet('sample.parq')
解决方案尝试
假设我的机器只有 512Mb 内存。当然,我的问题规模要大得多(在我的例子中是 TB),但这个简单的问题似乎捕获了我在更大数据集上遇到的同样问题。
因此,我将使用两名工人,每个工人 200Mb
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, memory_limit='200Mb')
client = Client(cluster)
data = dd.read_parquet('sample.parq')
task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)
由于每个分区占用 10Mb 的内存,根据 Wes Kinney 的 rule of thumb,我可能最多需要 50-100Mb 的内存来处理一个分区,所以 200Mb 应该绰绰有余。
然而,当我 运行 task.compute()
工作人员几乎立即 运行 内存不足时,重新启动并随后完全被杀死。
我尝试过的东西
限制资源
我也试过限制worker resources。据我了解,这应该让工作人员知道它一次只能处理一项任务。也许这太保守了,但在那种情况下,我预计会发生死锁,而不是 运行 内存不足。
cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})
task.compute(resources={'m': 200})
然而,遗憾的是结果是一样的
分析读取 parquet 的内存使用情况
SultanOrazbayev 建议我应该使用 memory_profiler
来查看加载单个分区时使用了多少内存,因为 read_parquet
的使用可能是这里的罪魁祸首。
我写了test-load.py
import pandas as pd
@profile
def load_parq():
return pd.read_parquet('sample.parq/part.0.parquet')
if __name__ == '__main__':
df = load_parq()
print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')
和运行它使用python3 -m memory_profiler test-load.py
。这是输出:
Memory footprint: 10.0MB
Filename: test-load.py
Line # Mem usage Increment Occurences Line Contents
============================================================
3 74.750 MiB 74.750 MiB 1 @profile
4 def load_parq():
5 132.992 MiB 58.242 MiB 1 return pd.read_parquet('sample.parq/part.0.parquet')
而且——很公平——即使读取单个文件也需要比我想象的更多的内存。 200MB
可能不够,但多少才够?
根据我的设置,两个工人中的每个人的答案大约是 4GB
。这实际上等于整个数据集。事实上,查看仪表板,dask 似乎很乐意同时加载数十个分区。如果它有 4GB
的内存那很好,但是如果它没有那么多我该如何继续?
问题出现在处理的早期 - 在读取数据期间。如果您在 Jupyter Lab 中使用 memory profiler
(对于 Python 脚本使用 pip install memory_profiler
),那么您将看到简单地使用 pandas 加载文件使用的内存是文件大小。在我使用 csv 和 parquet 文件的实验中,内存倍增器大约是底层文件大小的 3 到 10 倍(我使用的是 pandas version 1.2.3
)。
Google 显示 pd.read_csv
和 pd.read_parquet
的高内存使用率是一个反复出现的问题...因此,除非您能找到一种内存高效的加载数据的方式,否则工作人员必须给定更多内存(或文件大小方面的负载更小)。请注意,这是在任何 dask 操作之前出现的问题,因此超出了 resources
选项的控制范围。
我想我已经找到了本案的罪魁祸首。问题是 dask 自动尝试使用尽可能多的可用内核。
from dask.distributed import Client
with Client(n_workers=2, memory_limit='300Mb') as client:
print(client)
生产
<Client: 'tcp://127.0.0.1:39337' processes=2 threads=48, memory=600.00 MB>
当我因此尝试读取 parquet 文件时,dask 使用了所有 48 个可用内核,立即 运行 内存不足。
这里的技巧是限制每个工作线程的数量:
with Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) as client:
print(client)
产生
<Client: 'tcp://127.0.0.1:34421' processes=2 threads=2, memory=600.00 MB>
然后计算继续进行,没有任何问题,每个工作人员在任何时间点使用大约 200-250MB。
相关问题
- Difference between dask.distributed LocalCluster with threads vs. processes
tl;博士
我想根据列的值过滤 Dask 数据框,即
data.loc[data[column].lt(value)].to_parquet(path)
但是我这样做 运行 内存不足,尽管每个分区都比可用内存小 20 倍。
示例数据
让我们先创建一些示例数据来使用
import numpy as np
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])
ddf = dd.from_pandas(df, npartitions=800) # each partition has about 10Mb in memory
ddf.to_parquet('sample.parq')
解决方案尝试
假设我的机器只有 512Mb 内存。当然,我的问题规模要大得多(在我的例子中是 TB),但这个简单的问题似乎捕获了我在更大数据集上遇到的同样问题。
因此,我将使用两名工人,每个工人 200Mb
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, memory_limit='200Mb')
client = Client(cluster)
data = dd.read_parquet('sample.parq')
task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)
由于每个分区占用 10Mb 的内存,根据 Wes Kinney 的 rule of thumb,我可能最多需要 50-100Mb 的内存来处理一个分区,所以 200Mb 应该绰绰有余。
然而,当我 运行 task.compute()
工作人员几乎立即 运行 内存不足时,重新启动并随后完全被杀死。
我尝试过的东西
限制资源
我也试过限制worker resources。据我了解,这应该让工作人员知道它一次只能处理一项任务。也许这太保守了,但在那种情况下,我预计会发生死锁,而不是 运行 内存不足。
cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})
task.compute(resources={'m': 200})
然而,遗憾的是结果是一样的
分析读取 parquet 的内存使用情况
SultanOrazbayev 建议我应该使用 memory_profiler
来查看加载单个分区时使用了多少内存,因为 read_parquet
的使用可能是这里的罪魁祸首。
我写了test-load.py
import pandas as pd
@profile
def load_parq():
return pd.read_parquet('sample.parq/part.0.parquet')
if __name__ == '__main__':
df = load_parq()
print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')
和运行它使用python3 -m memory_profiler test-load.py
。这是输出:
Memory footprint: 10.0MB
Filename: test-load.py
Line # Mem usage Increment Occurences Line Contents
============================================================
3 74.750 MiB 74.750 MiB 1 @profile
4 def load_parq():
5 132.992 MiB 58.242 MiB 1 return pd.read_parquet('sample.parq/part.0.parquet')
而且——很公平——即使读取单个文件也需要比我想象的更多的内存。 200MB
可能不够,但多少才够?
根据我的设置,两个工人中的每个人的答案大约是 4GB
。这实际上等于整个数据集。事实上,查看仪表板,dask 似乎很乐意同时加载数十个分区。如果它有 4GB
的内存那很好,但是如果它没有那么多我该如何继续?
问题出现在处理的早期 - 在读取数据期间。如果您在 Jupyter Lab 中使用 memory profiler
(对于 Python 脚本使用 pip install memory_profiler
),那么您将看到简单地使用 pandas 加载文件使用的内存是文件大小。在我使用 csv 和 parquet 文件的实验中,内存倍增器大约是底层文件大小的 3 到 10 倍(我使用的是 pandas version 1.2.3
)。
Google 显示 pd.read_csv
和 pd.read_parquet
的高内存使用率是一个反复出现的问题...因此,除非您能找到一种内存高效的加载数据的方式,否则工作人员必须给定更多内存(或文件大小方面的负载更小)。请注意,这是在任何 dask 操作之前出现的问题,因此超出了 resources
选项的控制范围。
我想我已经找到了本案的罪魁祸首。问题是 dask 自动尝试使用尽可能多的可用内核。
from dask.distributed import Client
with Client(n_workers=2, memory_limit='300Mb') as client:
print(client)
生产
<Client: 'tcp://127.0.0.1:39337' processes=2 threads=48, memory=600.00 MB>
当我因此尝试读取 parquet 文件时,dask 使用了所有 48 个可用内核,立即 运行 内存不足。
这里的技巧是限制每个工作线程的数量:
with Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) as client:
print(client)
产生
<Client: 'tcp://127.0.0.1:34421' processes=2 threads=2, memory=600.00 MB>
然后计算继续进行,没有任何问题,每个工作人员在任何时间点使用大约 200-250MB。
相关问题
- Difference between dask.distributed LocalCluster with threads vs. processes