Dask Distributed - 如何 运行 每个工作人员执行一项任务,使该任务 运行 在工作人员可用的所有内核上运行?

Dask Distributed - how to run one task per worker, making that task running on all cores available into the worker?

我对使用 distributed python 库还很陌生。我有 4 个工作人员,我已经成功地为每个工作人员使用 14 个内核(在 16 个可用内核中)启动了一些并行运行,从而并行执行了 4*14=56 个任务 运行。

但是,如果我希望每个工人一次只执行一项任务,该如何进行。这样,我希望在工作线程上并行使用 14 个内核的任务。

Dask worker 维护一个用于启动任务的线程池。每个任务始终使用该池中的一个线程。你不能告诉一个任务从这个池中获取很多线程。

但是,还有其他方法可以控制和限制 dask worker 中的并发。在您的情况下,您可以考虑定义 worker resources。这可以让您停止来自 运行ning 的许多大型任务同时在相同的工人身上。

在下面的示例中,我们定义每个工人有一个 Foo 资源,并且每个任务需要一个 Foo 到 运行。这将停止来自 运行ning 的任何两个任务同时在同一个工作人员上。

dask-worker scheduler-address:8786 --resources Foo=1
dask-worker scheduler-address:8786 --resources Foo=1

.

from dask.distributed import Client
client = Client('scheduler-address:8786')
futures = client.map(my_expensive_function, ..., resources={'Foo': 1})

下面是一个示例,说明当您希望在 python 中而不是在命令行中启动 worker 时分配资源限制:

from dask.distributed import Client
from dask import delayed
import time
import os

client_with_foo = Client(processes = False,
    n_workers= 2,
    threads_per_worker=10,
    resources = {'foo':1}
               )

@delayed
def do_work(cmd=None, interval=2):
    time.sleep(interval)
    return None


task_graph = []
for i in range(10):
    task_graph.append(do_work())

start = time.time()
result = client_with_foo.compute(task_graph, resources = {'foo':1})
output = client_with_foo.gather(result)
end = time.time()
print(end - start)

分配给两个工作人员的十个 2 秒任务需要 10 秒才能执行,因此上述代码的输出大约为 10。