使用 Dask 将大于内存的数据帧缓存到本地磁盘
Cache larger-than-memory dataframe to local disk with Dask
我在 S3 中有一堆文件,其中包含一个大于内存的数据帧。
目前,我使用 Dask 将文件读入数据帧,使用较小的数据集执行内部连接(每次调用此函数时都会发生变化,而 huge_df
基本上是完整的数据集 &不改变),调用计算得到一个更小的 pandas 数据框,然后做一些处理。例如:
huge_df = ddf.read_csv("s3://folder/**/*.part")
merged_df = huge_df.join(small_df, how='inner', ...)
merged_df = merged_df.compute()
...other processing...
大部分时间都花在了从 S3 下载文件上。我的问题是:有没有办法使用 Dask 将 S3 中的文件缓存到磁盘上,以便在后续调用此代码时,我可以只从磁盘而不是 S3 中读取数据帧文件?我想我不能只调用 huge_df.to_csv(./local-dir/)
,因为那样会把 huge_df
带入内存,这是行不通的。
我确定有一种方法可以结合使用其他工具和标准 Python IO 实用程序来完成此操作,但我想看看是否有一种方法可以使用 Dask 下载文件内容从 S3 并将它们存储在本地磁盘上,而不将所有内容都放入内存中。
做 huge_df.to_csv
会奏效,因为它会在本地将每个分区写入一个单独的文件,所以整个事情不会立即在内存中。
但是,为了回答具体问题,dask 使用 fsspec
来管理文件操作,它允许 local caching,例如,您可以
huge_df = ddf.read_csv("simplecache::s3://folder/**/*.part")
默认情况下,这会将文件存储在临时文件夹中,当您退出 python 会话时该文件夹会被清理,但您可以使用可选参数 storage_options={"simplecache": {..}}
提供选项以指定缓存位置,或者使用 "filecache" 而不是 "simplecache" 如果你想让本地副本在一段时间后过期或检查更新版本的目标。
请注意,显然,只有当所有工作人员都可以访问同一缓存位置时,这些才适用于分布式集群,因为分区加载可能发生在您的任何工作人员身上。
我在 S3 中有一堆文件,其中包含一个大于内存的数据帧。
目前,我使用 Dask 将文件读入数据帧,使用较小的数据集执行内部连接(每次调用此函数时都会发生变化,而 huge_df
基本上是完整的数据集 &不改变),调用计算得到一个更小的 pandas 数据框,然后做一些处理。例如:
huge_df = ddf.read_csv("s3://folder/**/*.part")
merged_df = huge_df.join(small_df, how='inner', ...)
merged_df = merged_df.compute()
...other processing...
大部分时间都花在了从 S3 下载文件上。我的问题是:有没有办法使用 Dask 将 S3 中的文件缓存到磁盘上,以便在后续调用此代码时,我可以只从磁盘而不是 S3 中读取数据帧文件?我想我不能只调用 huge_df.to_csv(./local-dir/)
,因为那样会把 huge_df
带入内存,这是行不通的。
我确定有一种方法可以结合使用其他工具和标准 Python IO 实用程序来完成此操作,但我想看看是否有一种方法可以使用 Dask 下载文件内容从 S3 并将它们存储在本地磁盘上,而不将所有内容都放入内存中。
做 huge_df.to_csv
会奏效,因为它会在本地将每个分区写入一个单独的文件,所以整个事情不会立即在内存中。
但是,为了回答具体问题,dask 使用 fsspec
来管理文件操作,它允许 local caching,例如,您可以
huge_df = ddf.read_csv("simplecache::s3://folder/**/*.part")
默认情况下,这会将文件存储在临时文件夹中,当您退出 python 会话时该文件夹会被清理,但您可以使用可选参数 storage_options={"simplecache": {..}}
提供选项以指定缓存位置,或者使用 "filecache" 而不是 "simplecache" 如果你想让本地副本在一段时间后过期或检查更新版本的目标。
请注意,显然,只有当所有工作人员都可以访问同一缓存位置时,这些才适用于分布式集群,因为分区加载可能发生在您的任何工作人员身上。