使用 Dask 将大于内存的数据帧缓存到本地磁盘

Cache larger-than-memory dataframe to local disk with Dask

我在 S3 中有一堆文件,其中包含一个大于内存的数据帧。

目前,我使用 Dask 将文件读入数据帧,使用较小的数据集执行内部连接(每次调用此函数时都会发生变化,而 huge_df 基本上是完整的数据集 &不改变),调用计算得到一个更小的 pandas 数据框,然后做一些处理。例如:

huge_df = ddf.read_csv("s3://folder/**/*.part") 
merged_df = huge_df.join(small_df, how='inner', ...)
merged_df = merged_df.compute()
...other processing...

大部分时间都花在了从 S3 下载文件上。我的问题是:有没有办法使用 Dask 将 S3 中的文件缓存到磁盘上,以便在后续调用此代码时,我可以只从磁盘而不是 S3 中读取数据帧文件?我想我不能只调用 huge_df.to_csv(./local-dir/),因为那样会把 huge_df 带入内存,这是行不通的。

我确定有一种方法可以结合使用其他工具和标准 Python IO 实用程序来完成此操作,但我想看看是否有一种方法可以使用 Dask 下载文件内容从 S3 并将它们存储在本地磁盘上,而不将所有内容都放入内存中。

huge_df.to_csv 会奏效,因为它会在本地将每个分区写入一个单独的文件,所以整个事情不会立即在内存中。

但是,为了回答具体问题,dask 使用 fsspec 来管理文件操作,它允许 local caching,例如,您可以

huge_df = ddf.read_csv("simplecache::s3://folder/**/*.part")

默认情况下,这会将文件存储在临时文件夹中,当您退出 python 会话时该文件夹会被清理,但您可以使用可选参数 storage_options={"simplecache": {..}} 提供选项以指定缓存位置,或者使用 "filecache" 而不是 "simplecache" 如果你想让本地副本在一段时间后过期或检查更新版本的目标。

请注意,显然,只有当所有工作人员都可以访问同一缓存位置时,这些才适用于分布式集群,因为分区加载可能发生在您的任何工作人员身上。