在 Dask 中排队工人

Queueing up workers in Dask

我有以下需要使用 Dask 调度程序和工作程序解决的场景:

这意味着我需要 N * W worker 来并行 运行 一切。问题是这不是最优的,因为它分配了太多的资源,我 运行 它在云端并且很昂贵。还有,N是用户定义的,所以我事先不知道我需要有多大的处理能力。

有没有办法让工人排队,如果我定义 Dask 有 X 个工人,当一个工人结束时,下一个工人开始?

首先定义您需要的工作人员数量,将它们视为临时的,但在整个处理过程中都是静态的
您可以动态创建它们(当您开始或稍后),但可能希望在处理开始时就将它们全部准备好

在您看来,client is an executor(因此当您同时提及 workers运行 时 , 你可能是同样的意思

This class resembles executors in concurrent.futures but also allows Future objects within submit/map calls. When a Client is instantiated it takes over all dask.compute and dask.persist calls by default.

一旦您的工作人员有空,Dask 将通过调度程序分配给他们的工作

你应该通过将结果传递给 dask.delayed() 和前面的函数结果(这是一个 Future,而不是结果)
来使任何相互依赖的任务这样做 这 Futures-as-arguments 将允许 Dask 构建您的工作的任务图

示例使用https://examples.dask.org/delayed.html
未来参考 https://docs.dask.org/en/latest/futures.html#distributed.Future

依赖期货 dask.delayed

这里有一个来自 Delayed docs 的完整示例(实际上是将几个连续的示例组合成相同的结果)

import dask
from dask.distributed import Client

client = Client(...)  # connect to distributed cluster

def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)    # depends on a and b
    output.append(c)

total = dask.delayed(sum)(output)  # depends on everything
total.compute()  # 45

你可以打电话给total.visualize()see the task graph


(图片来自 Dask Delayed 文档)

Collections 的期货

如果您已经在使用 .map(..) 来映射函数和参数对,您可以继续创建 Futures 然后 .gather(..) 一次创建它们,即使它们在 collection(这里方便你)

.gather()'ed results 将按照给定的顺序排列(列表列表)

[[fn1(args11), fn1(args12)], [fn2(args21)], [fn3(args31), fn3(args32), fn3(args33)]]

https://distributed.dask.org/en/latest/api.html#distributed.Client.gather

import dask
from dask.distributed import Client

client = Client(...)  # connect to distributed cluster

collection_of_futures = []

for worker_func, worker_args in iterable_of_pairs_of_fn_args:
    futures = client.map(worker_func, worker_args)
    collection_of_futures.append(futures)

results = client.gather(collection_of_futures)

笔记

  • worker_args 必须是一些可迭代映射到 worker_func,这可能是错误的来源
  • .gather()ing 将阻塞直到所有期货完成或加注

.as_completed()

如果您需要尽快获得结果,可以使用 .as_completed(..),但请注意结果将按 non-deterministic 顺序排列,所以我认为这对您来说没有意义case .. 如果你发现它,你需要一些额外的保证

  • 在结果中包含有关如何处理结果的信息
  • 保留对每个的引用并检查它们
  • 只组合无关紧要的组(即所有 Futures 都有相同的目的)

另请注意,产生的期货是完整的,但仍然是 Future,因此您仍然需要调用 .result().gather() 它们

https://distributed.dask.org/en/latest/api.html#distributed.as_completed