为什么来自 s3 的 dask read_csv 保留了这么多内存?

Why is dask read_csv from s3 keeping so much memory?

我正在使用 dask(SQL 查询的替代品)从 s3 读取一些压缩数据。但是,看起来有一些数据文件的缓存,或者在系统内存中某处保存的解压缩文件。注意,这应该是 运行nable,这里的测试数据来自 public s3 存储桶中的 pandas 测试套件。

import dask.dataframe as dd
import pandas as pd
import psutil as ps
import os

#for easier vis
mb = 1048576

def mytestfunc(file):
    process = ps.Process(os.getpid())

    print('initial memory: {0}'.format(process.memory_info().rss/mb))
    data = dd.read_csv(file, compression = 'gzip', blocksize = None, storage_options = {'anon':True})

    print('dask plan memory: {0}'.format(process.memory_info().rss/mb))

    data = data.compute()
    print('data in memory: {0}'.format(process.memory_info().rss/mb))
    print('data frame usage: {0}'.format(data.memory_usage(deep=True).sum()/mb))
    return data

process = ps.Process(os.getpid())
print('before function call: {0}'.format(process.memory_info().rss/mb))
out = mytestfunc('s3://pandas-test/large_random.csv.gz')
print('After function call: {0}'.format(process.memory_info().rss/mb))
# out = mytestfunc('s3://pandas-test/tips.csv.gz')
# print('After smaller function call: {0}'.format(process.memory_info().rss/mb))

这给了我:

before function call: 76.984375
initial memory: 76.984375
dask plan memory: 92.9921875
data in memory: 224.71484375
data frame usage: 38.14704895019531
After function call: 224.7265625

天真地,我希望 'after function call' 是 'before function call' 加上数据帧和一些开销。在这里,gzip 是 43mb,导致大约 90mb 的开销,在我的真实示例中,这个额外的部分是 10gb 数据帧的大约 50gb 额外内存。

如果您对另一个较小的文件重新运行,您可以看到内存已释放 - 取消对较小文件的重新运行 的注释以查看它。这也表明增加是由于文件大小 - 您可以先切换顺序和 运行 'tips',内存保持在 ~90mb。

我猜 dask、s3fs 或 pandas 正在将文件或解压缩的内容保存在某个缓冲区中,但我无法找到它来清除它。

关于如何减少此内存使用或释放​​缓冲区的任何想法?

编辑:我的一些真实数据的上述输出示例 - 32 个 gzip 文件:

before function call: 70.69921875
initial memory: 70.69921875
dask plan memory: 80.16015625
data in memory: 33991.69921875
data frame usage: 10824.553115844727
After function call: 33991.69921875

我知道 dask 会比 pandas 循环使用相同的 32 个文件有更高的内存使用峰值,但我仍然不明白为什么它没有被释放。

在线程中使用 pandas.read_csv 时,Python 进程似乎泄漏了一点内存。我已将其简化为 pandas.read_csvconcurrent.futures.ThreadPoolExecutor 的问题。这是在 Pandas 问题跟踪器上提出的:https://github.com/pandas-dev/pandas/issues/19941

# imports
import pandas as pd
import numpy as np
import time
import psutil
from concurrent.futures import ThreadPoolExecutor

# prep
process = psutil.Process()
e = ThreadPoolExecutor(8)

# prepare csv file, only need to run once
pd.DataFrame(np.random.random((100000, 50))).to_csv('large_random.csv')


# baseline computation making pandas dataframes with threasds.  This works fine

def f(_):
    return pd.DataFrame(np.random.random((1000000, 50)))

print('before:', process.memory_info().rss // 1e6, 'MB')
list(e.map(f, range(8)))
time.sleep(1)  # let things settle
print('after:', process.memory_info().rss // 1e6, 'MB')

# before: 57.0 MB
# after: 56.0 MB

# example with read_csv, this leaks memory
print('before:', process.memory_info().rss // 1e6, 'MB')
list(e.map(pd.read_csv, ['large_random.csv'] * 8))
time.sleep(1)  # let things settle
print('after:', process.memory_info().rss // 1e6, 'MB')

# before: 58.0 MB
# after: 323.0 MB