分布式Dask如何高效提交大参数任务?

How to efficiently submit tasks with large arguments in Dask distributed?

我想提交带有大(千兆字节)参数的 Dask 函数。做这个的最好方式是什么?我想 运行 这个函数多次使用不同的(小)参数。

例子(不好)

这使用 concurrent.futures 界面。我们可以同样轻松地使用 dask.delayed 界面。

x = np.random.random(size=100000000)  # 800MB array
params = list(range(100))             # 100 small parameters

def f(x, param):
    pass

from dask.distributed import Client
c = Client()

futures = [c.submit(f, x, param) for param in params]

但这比我预期的要慢或导致内存错误。

好的,所以这里的问题是每个任务都包含很大的 numpy 数组 x。对于我们提交的 100 个任务中的每一个,我们都需要序列化 ​​x,将其发送给调度程序,将其发送给工作人员,等等。

相反,我们会将数组发送到集群一次:

[future] = c.scatter([x])

现在 future 是指向集群上数组 x 的标记。现在我们可以提交引用这个远程未来的任务,而不是我们本地客户端上的 numpy 数组。

# futures = [c.submit(f, x, param) for param in params]  # sends x each time
futures = [c.submit(f, future, param) for param in params]  # refers to remote x already on cluster

现在速度更快,让 Dask 更有效地控制数据移动。

向所有工作人员分散数据

如果您希望最终需要将数组 x 移动到所有工作人员,那么您可能希望广播数组以开始

[future] = c.scatter([x], broadcast=True)

延迟使用 Dask

期货也适用于 dask.delayed。这里没有性能优势,但有些人更喜欢这个接口:

# futures = [c.submit(f, future, param) for param in params]

from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)