使用 dask 加载大型数据集
Loading large datasets with dask
我在一个 HPC 环境中,它有集群、紧密耦合的互连和支持 Lustre 文件系统。我们一直在探索如何利用 Dask 不仅提供计算,而且充当分布式缓存来加速我们的工作流程。我们专有的数据格式是 n 维和规则的,我们编写了一个惰性 reader 来传递给 from_array/from_delayed 方法。
我们在跨 Dask 集群加载和持久化大于内存的数据集时遇到了一些问题。
hdf5 示例:
# Dask scheduler has been started and connected to 8 workers
# spread out on 8 machines, each with --memory-limit=150e9.
# File locking for reading hdf5 is also turned off
from dask.distributed import Client
c = Client({ip_of_scheduler})
import dask.array as da
import h5py
hf = h5py.File('path_to_600GB_hdf5_file', 'r')
ds = hf[hf.keys()[0]]
x = da.from_array(ds, chunks=(100, -1, -1))
x = c.persist(x) # takes 40 minutes, far below network and filesystem capabilities
print x[300000,:,:].compute() # works as expected
我们还从我们自己的一些文件格式中加载了数据集(使用切片、dask.delayed 和 from_delayed),并且随着文件大小的增加,性能也出现了类似的下降。
我的问题:使用 Dask 作为分布式缓存是否存在固有的瓶颈?是否所有数据都将被迫通过调度程序汇集?工作人员是否能够利用 Lustre,或者函数 and/or I/O 是否以某种方式序列化?如果是这样的话,不在海量数据集上调用 persist 而只让 Dask 在需要时处理数据和计算会更有效吗?
使用 Dask 作为分布式缓存是否存在固有瓶颈?
每个系统都存在瓶颈,但听起来您还没有接近 运行 我对 Dask 的预期瓶颈。
我怀疑你 运行 喜欢别的东西。
是否会强制所有数据通过调度程序汇集?
不,工作人员可以执行自己加载数据的功能。这些数据将保留在工人身上。
工作人员是否能够利用 Lustre,或者函数 and/or I/O 是否以某种方式序列化?
Workers 只是 Python 个进程,因此如果集群上的 Python 个进程 运行 可以利用 Lustre(几乎可以肯定是这种情况)那么是的,Dask Workers 可以充分利用 Lustre。
如果是这样的话,在海量数据集上不调用 persist 而只让 Dask 在需要时处理数据和计算会更有效吗?
这当然很常见。这里的权衡是在 NFS 的分布式带宽和分布式内存的可用性之间。
在你的位置上,我会使用 Dask 的诊断来找出是什么占用了这么多时间。您可能希望特别阅读有关 understanding performance and the section on the dashboard 的文档。该部分有一个视频可能特别有用。我想问两个问题:
- 工人 运行 一直在做任务吗? (状态页面,任务流图)
- 在这些任务中,什么占用了时间? (个人资料页面)
我在一个 HPC 环境中,它有集群、紧密耦合的互连和支持 Lustre 文件系统。我们一直在探索如何利用 Dask 不仅提供计算,而且充当分布式缓存来加速我们的工作流程。我们专有的数据格式是 n 维和规则的,我们编写了一个惰性 reader 来传递给 from_array/from_delayed 方法。
我们在跨 Dask 集群加载和持久化大于内存的数据集时遇到了一些问题。
hdf5 示例:
# Dask scheduler has been started and connected to 8 workers
# spread out on 8 machines, each with --memory-limit=150e9.
# File locking for reading hdf5 is also turned off
from dask.distributed import Client
c = Client({ip_of_scheduler})
import dask.array as da
import h5py
hf = h5py.File('path_to_600GB_hdf5_file', 'r')
ds = hf[hf.keys()[0]]
x = da.from_array(ds, chunks=(100, -1, -1))
x = c.persist(x) # takes 40 minutes, far below network and filesystem capabilities
print x[300000,:,:].compute() # works as expected
我们还从我们自己的一些文件格式中加载了数据集(使用切片、dask.delayed 和 from_delayed),并且随着文件大小的增加,性能也出现了类似的下降。
我的问题:使用 Dask 作为分布式缓存是否存在固有的瓶颈?是否所有数据都将被迫通过调度程序汇集?工作人员是否能够利用 Lustre,或者函数 and/or I/O 是否以某种方式序列化?如果是这样的话,不在海量数据集上调用 persist 而只让 Dask 在需要时处理数据和计算会更有效吗?
使用 Dask 作为分布式缓存是否存在固有瓶颈?
每个系统都存在瓶颈,但听起来您还没有接近 运行 我对 Dask 的预期瓶颈。 我怀疑你 运行 喜欢别的东西。
是否会强制所有数据通过调度程序汇集?
不,工作人员可以执行自己加载数据的功能。这些数据将保留在工人身上。
工作人员是否能够利用 Lustre,或者函数 and/or I/O 是否以某种方式序列化?
Workers 只是 Python 个进程,因此如果集群上的 Python 个进程 运行 可以利用 Lustre(几乎可以肯定是这种情况)那么是的,Dask Workers 可以充分利用 Lustre。
如果是这样的话,在海量数据集上不调用 persist 而只让 Dask 在需要时处理数据和计算会更有效吗?
这当然很常见。这里的权衡是在 NFS 的分布式带宽和分布式内存的可用性之间。
在你的位置上,我会使用 Dask 的诊断来找出是什么占用了这么多时间。您可能希望特别阅读有关 understanding performance and the section on the dashboard 的文档。该部分有一个视频可能特别有用。我想问两个问题:
- 工人 运行 一直在做任务吗? (状态页面,任务流图)
- 在这些任务中,什么占用了时间? (个人资料页面)