使用 dask 加载大型数据集

Loading large datasets with dask

我在一个 HPC 环境中,它有集群、紧密耦合的互连和支持 Lustre 文件系统。我们一直在探索如何利用 Dask 不仅提供计算,而且充当分布式缓存来加速我们的工作流程。我们专有的数据格式是 n 维和规则的,我们编写了一个惰性 reader 来传递给 from_array/from_delayed 方法。

我们在跨 Dask 集群加载和持久化大于内存的数据集时遇到了一些问题。

hdf5 示例:

# Dask scheduler has been started and connected to 8 workers
# spread out on 8 machines, each with --memory-limit=150e9.
# File locking for reading hdf5 is also turned off
from dask.distributed import Client
c = Client({ip_of_scheduler})
import dask.array as da
import h5py
hf = h5py.File('path_to_600GB_hdf5_file', 'r')
ds = hf[hf.keys()[0]]
x = da.from_array(ds, chunks=(100, -1, -1))
x = c.persist(x) # takes 40 minutes, far below network and filesystem capabilities
print x[300000,:,:].compute() # works as expected

我们还从我们自己的一些文件格式中加载了数据集(使用切片、dask.delayed 和 from_delayed),并且随着文件大小的增加,性能也出现了类似的下降。

我的问题:使用 Dask 作为分布式缓存是否存在固有的瓶颈?是否所有数据都将被迫通过调度程序汇集?工作人员是否能够利用 Lustre,或者函数 and/or I/O 是否以某种方式序列化?如果是这样的话,不在海量数据集上调用 persist 而只让 Dask 在需要时处理数据和计算会更有效吗?

  • 使用 Dask 作为分布式缓存是否存在固有瓶颈?

    每个系统都存在瓶颈,但听起来您还没有接近 运行 我对 Dask 的预期瓶颈。 我怀疑你 运行 喜欢别的东西。

  • 是否会强制所有数据通过调度程序汇集?

    不,工作人员可以执行自己加载数据的功能。这些数据将保留在工人身上。

  • 工作人员是否能够利用 Lustre,或者函数 and/or I/O 是否以某种方式序列化?

    Workers 只是 Python 个进程,因此如果集群上的 Python 个进程 运行 可以利用 Lustre(几乎可以肯定是这种情况)那么是的,Dask Workers 可以充分利用 Lustre。

  • 如果是这样的话,在海量数据集上不调用 persist 而只让 Dask 在需要时处理数据和计算会更有效吗?

    这当然很常见。这里的权衡是在 NFS 的分布式带宽和分布式内存的可用性之间。

在你的位置上,我会使用 Dask 的诊断来找出是什么占用了这么多时间。您可能希望特别阅读有关 understanding performance and the section on the dashboard 的文档。该部分有一个视频可能特别有用。我想问两个问题:

  1. 工人 运行 一直在做任务吗? (状态页面,任务流图)
  2. 在这些任务中,什么占用了时间? (个人资料页面)