在 Dask 中排队工人
Queueing up workers in Dask
我有以下需要使用 Dask 调度程序和工作程序解决的场景:
Dask程序循环调用了N个函数(N个由用户定义)
每个函数并行启动delayed(func)(args)
到运行。
从上一点开始的每个函数启动时,都会触发W个worker。这就是我调用工人的方式:
futures = client.map(worker_func, worker_args)
worker_responses = client.gather(futures)
这意味着我需要 N * W worker 来并行 运行 一切。问题是这不是最优的,因为它分配了太多的资源,我 运行 它在云端并且很昂贵。还有,N是用户定义的,所以我事先不知道我需要有多大的处理能力。
有没有办法让工人排队,如果我定义 Dask 有 X 个工人,当一个工人结束时,下一个工人开始?
首先定义您需要的工作人员数量,将它们视为临时的,但在整个处理过程中都是静态的
您可以动态创建它们(当您开始或稍后),但可能希望在处理开始时就将它们全部准备好
在您看来,client is an executor(因此当您同时提及 workers 和 运行 时 , 你可能是同样的意思
This class resembles executors in concurrent.futures
but also allows Future
objects within submit/map
calls. When a Client is instantiated it takes over all dask.compute
and dask.persist
calls by default.
一旦您的工作人员有空,Dask 将通过调度程序分配给他们的工作
你应该通过将结果传递给 dask.delayed()
和前面的函数结果(这是一个 Future,而不是结果)
来使任何相互依赖的任务这样做
这 Futures-as-arguments 将允许 Dask 构建您的工作的任务图
示例使用https://examples.dask.org/delayed.html
未来参考 https://docs.dask.org/en/latest/futures.html#distributed.Future
依赖期货 dask.delayed
这里有一个来自 Delayed docs 的完整示例(实际上是将几个连续的示例组合成相同的结果)
import dask
from dask.distributed import Client
client = Client(...) # connect to distributed cluster
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b) # depends on a and b
output.append(c)
total = dask.delayed(sum)(output) # depends on everything
total.compute() # 45
你可以打电话给total.visualize()
到see the task graph
Collections 的期货
如果您已经在使用 .map(..)
来映射函数和参数对,您可以继续创建 Futures 然后 .gather(..)
一次创建它们,即使它们在 collection(这里方便你)
.gather()
'ed results
将按照给定的顺序排列(列表列表)
[[fn1(args11), fn1(args12)], [fn2(args21)], [fn3(args31), fn3(args32), fn3(args33)]]
https://distributed.dask.org/en/latest/api.html#distributed.Client.gather
import dask
from dask.distributed import Client
client = Client(...) # connect to distributed cluster
collection_of_futures = []
for worker_func, worker_args in iterable_of_pairs_of_fn_args:
futures = client.map(worker_func, worker_args)
collection_of_futures.append(futures)
results = client.gather(collection_of_futures)
笔记
worker_args
必须是一些可迭代映射到 worker_func
,这可能是错误的来源
.gather()
ing 将阻塞直到所有期货完成或加注
.as_completed()
如果您需要尽快获得结果,可以使用 .as_completed(..)
,但请注意结果将按 non-deterministic 顺序排列,所以我认为这对您来说没有意义case .. 如果你发现它,你需要一些额外的保证
- 在结果中包含有关如何处理结果的信息
- 保留对每个的引用并检查它们
- 只组合无关紧要的组(即所有 Futures 都有相同的目的)
另请注意,产生的期货是完整的,但仍然是 Future,因此您仍然需要调用 .result()
或 .gather()
它们
https://distributed.dask.org/en/latest/api.html#distributed.as_completed
我有以下需要使用 Dask 调度程序和工作程序解决的场景:
Dask程序循环调用了N个函数(N个由用户定义)
每个函数并行启动
delayed(func)(args)
到运行。从上一点开始的每个函数启动时,都会触发W个worker。这就是我调用工人的方式:
futures = client.map(worker_func, worker_args) worker_responses = client.gather(futures)
这意味着我需要 N * W worker 来并行 运行 一切。问题是这不是最优的,因为它分配了太多的资源,我 运行 它在云端并且很昂贵。还有,N是用户定义的,所以我事先不知道我需要有多大的处理能力。
有没有办法让工人排队,如果我定义 Dask 有 X 个工人,当一个工人结束时,下一个工人开始?
首先定义您需要的工作人员数量,将它们视为临时的,但在整个处理过程中都是静态的
您可以动态创建它们(当您开始或稍后),但可能希望在处理开始时就将它们全部准备好
在您看来,client is an executor(因此当您同时提及 workers 和 运行 时 , 你可能是同样的意思
This class resembles executors in
concurrent.futures
but also allowsFuture
objects withinsubmit/map
calls. When a Client is instantiated it takes over alldask.compute
anddask.persist
calls by default.
一旦您的工作人员有空,Dask 将通过调度程序分配给他们的工作
你应该通过将结果传递给 dask.delayed()
和前面的函数结果(这是一个 Future,而不是结果)
来使任何相互依赖的任务这样做
这 Futures-as-arguments 将允许 Dask 构建您的工作的任务图
示例使用https://examples.dask.org/delayed.html
未来参考 https://docs.dask.org/en/latest/futures.html#distributed.Future
依赖期货 dask.delayed
这里有一个来自 Delayed docs 的完整示例(实际上是将几个连续的示例组合成相同的结果)
import dask
from dask.distributed import Client
client = Client(...) # connect to distributed cluster
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b) # depends on a and b
output.append(c)
total = dask.delayed(sum)(output) # depends on everything
total.compute() # 45
你可以打电话给total.visualize()
到see the task graph
Collections 的期货
如果您已经在使用 .map(..)
来映射函数和参数对,您可以继续创建 Futures 然后 .gather(..)
一次创建它们,即使它们在 collection(这里方便你)
.gather()
'ed results
将按照给定的顺序排列(列表列表)
[[fn1(args11), fn1(args12)], [fn2(args21)], [fn3(args31), fn3(args32), fn3(args33)]]
https://distributed.dask.org/en/latest/api.html#distributed.Client.gather
import dask
from dask.distributed import Client
client = Client(...) # connect to distributed cluster
collection_of_futures = []
for worker_func, worker_args in iterable_of_pairs_of_fn_args:
futures = client.map(worker_func, worker_args)
collection_of_futures.append(futures)
results = client.gather(collection_of_futures)
笔记
worker_args
必须是一些可迭代映射到worker_func
,这可能是错误的来源.gather()
ing 将阻塞直到所有期货完成或加注
.as_completed()
如果您需要尽快获得结果,可以使用 .as_completed(..)
,但请注意结果将按 non-deterministic 顺序排列,所以我认为这对您来说没有意义case .. 如果你发现它,你需要一些额外的保证
- 在结果中包含有关如何处理结果的信息
- 保留对每个的引用并检查它们
- 只组合无关紧要的组(即所有 Futures 都有相同的目的)
另请注意,产生的期货是完整的,但仍然是 Future,因此您仍然需要调用 .result()
或 .gather()
它们
https://distributed.dask.org/en/latest/api.html#distributed.as_completed