Dask Distributed:在每个 worker 初始化任务上引入图依赖关系

Dask Distributed: Introduce graph dendencies on per worker initialisation tasks

在分布式任务中,任务通过调度程序分布在集群节点上。我希望引入每个节点对提交给节点的任务的依赖性。简而言之,我要执行的计算操作需要:

  1. 将数据预加载到每个节点上的 GPU 上。
  2. 在每个节点上使用分块 dask 数组中的其他数据执行 GPU 计算。

我还想在不同的数据集上多次入队 (1) 和 (2)。

我尝试将其设置为一个最小示例:

from __future__ import print_function

import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
                         get_worker, as_completed)
import numpy as np

cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")

with cluster, Client(cluster.scheduler_address) as client:
    workers = client.scheduler_info()['workers'].values()
    workers =  [v['name'] for v in workers]

    print("Workers {}".format(workers))

    def init_worker():
        get_worker()._stuff = 0
        return "OK"

    # Call init_worker on each worker. Need pure to
    # ensure this happens multiple times
    init_futures = [client.submit(init_worker, pure=False,
                                            workers=[w])
                                  for w in workers]

    assert all(f.result() == "OK" for f in as_completed(init_futures))

    A = da.arange(0, 20, chunks=(5,), dtype=np.float64)

    def inc_worker(A):
        w = get_worker()
        w._stuff += 1
        print("{}[{}]".format(w.name, w._stuff))
        return A + w._stuff

    def increment(A):
        """ Call inc_worker """
        from dask.base import tokenize
        name = 'increment-' + tokenize(A)
        dsk = { (name, i): (inc_worker, (A.name, i))
                        for n, i in A.dask.keys() }

        dsk.update(A.dask)
        return da.Array(dsk, name, A.chunks, A.dtype)

    print(increment(A).compute())
    print(increment(A).compute())

我想找到一些方法使提交给每个工作人员的 increment-* 任务依赖于提交给每个工作人员的 init-worker-* 任务。 换句话说,我想避免等待 init_futures 在客户端完成。这引入的问题是,虽然我们知道哪些 init-worker-* 任务与每个工人相关联,但没有明显的方法可以事先知道 increment-* 任务的工人关联。

一种可能的方法:

  1. 对于每个 inc_worker 调用,生成一个 local_client() 提交一个任务,其中 init-worker-* in get_worker().data 作为依赖项。我不喜欢这样,因为开销似乎很大。

关于如何做到这一点有什么建议吗?

编辑 1:实际上这无需等待 init_futures 完成即可工作,大概是因为它们在任何 [=11] 之前提交给工作调度程序=] 任务被提交给工人。仍然感觉我在做一个可能并不总是正确的假设...

编辑 2:提到这 2 个步骤应该在不同的数据集上 运行 多次。

部分选项:

  1. 使用client.run并等待。这与您上面的提交技巧一样,但更明确且痛苦更少。但是它确实会阻止,您说过您不想这样做。

    client.run(init_worker)
    
  2. 使用 worker --preload 脚本在 worker 启动时 运行 任意代码。参见 http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization

    cluster.start_worker(..., preload=['myscript.py'])
    
  3. 使init_worker幂等(可以运行多次不影响)并且总是在inc_worker

    内调用它
    def init_worker():
        if not hasattr(get_worker(), '_stuff'):
            get_worker()._stuff = 0
    
    def inc_worker(...):
        init_worker(...)
        ... do other things ...
    

此外,我注意到您正在手动构建 dask.arrays。您可能想看看 x.map_blocks(my_func)x.to_delayed/x.from_delayed