如何在不使用所有工作人员的情况下限制大量任务

how to throttle a large number of tasks without using all workers

假设我有一个 dask 网格,共有 10 个工人和 40 个核心。这是一个共享网格,所以我不想用我的工作完全饱和它。我有 1000 个任务要做,我想一次提交(并且已经主动 运行)最多 20 个任务。

具体来说,

from time import sleep
from random import random

def inc(x):
    from random import random
    sleep(random() * 2)
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>

如果我设置一个队列系统

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)

这会起作用,但是,这只会将我所有的任务转储到网格中,使其饱和。理想情况下我可以:

e.scatter(input_q, max_submit=20)

文档 here 中的示例似乎允许我使用 maxsize 队列。但从用户的角度来看,我仍然需要处理背压。理想情况下 dask 会自动处理这个问题。

使用maxsize=

你非常接近。 scattergathermap 都采用与 Queue 相同的 maxsize= 关键字参数。因此,一个简单的工作流程可能如下所示:

例子

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:
    q.put(i)

from dask.distributed import Executor
e = Executor('127.0.0.1:8786')        # Connect to cluster


futures = e.map(inc, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results

所有 qfuturesresults 都是 Python 队列对象。 qresults 队列没有限制,所以他们会贪婪地尽可能多地拉入。然而,futures 队列的最大大小为 20,因此它在任何给定时间只允许 20 个期货在飞行中。一旦 leading future 完成,它将立即被 gather 函数使用,其结果将被放入 results 队列中。这释放了 futures 中的 space 并导致提交另一个任务。

请注意,这并不是您想要的。这些队列是有序的,因此 futures 只有在它们排在队列前面时才会被弹出。如果除第一个之外的所有飞行中的期货都已完成,它们仍将留在队列中,占用 space。考虑到这一限制,您可能希望选择 maxsize= 比您想要的 20 项略多的项。

扩展这个

这里我们做了一个简单的 map->gather 管道,中间没有任何逻辑。您还可以将其他 map 计算放在此处,甚至可以将 futures 从队列中拉出并自行使用它们进行自定义工作。很容易打破上面提供的模式。

github 上发布的解决方案非常有用 - https://github.com/dask/distributed/issues/864

解法:

inputs = iter(inputs)
futures = [c.submit(func, next(inputs)) for i in range(maxsize)]
ac = as_completed(futures)

for finished_future in ac:
    # submit new future 
    try:
        new_future = c.submit(func, next(inputs))
        ac.append(new_future)
    except StopIteration:
        pass
    result = finished_future.result() 
    ... # do stuff with result

查询:

然而,为了确定可用于限制任务的工作人员,我正在尝试使用 client.has_what() api。似乎工作人员的负载不会像状态 UI 页面上显示的那样立即得到反映。有时 has_what 需要相当长的时间来反映任何数据。

是否有另一个 api 可用于确定空闲工人的数量,然后可用于确定油门范围,类似于 UI 正在使用的。