Dask Distributed:在每个 worker 初始化任务上引入图依赖关系
Dask Distributed: Introduce graph dendencies on per worker initialisation tasks
在分布式任务中,任务通过调度程序分布在集群节点上。我希望引入每个节点对提交给节点的任务的依赖性。简而言之,我要执行的计算操作需要:
- 将数据预加载到每个节点上的 GPU 上。
- 在每个节点上使用分块 dask 数组中的其他数据执行 GPU 计算。
我还想在不同的数据集上多次入队 (1) 和 (2)。
我尝试将其设置为一个最小示例:
from __future__ import print_function
import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
get_worker, as_completed)
import numpy as np
cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")
with cluster, Client(cluster.scheduler_address) as client:
workers = client.scheduler_info()['workers'].values()
workers = [v['name'] for v in workers]
print("Workers {}".format(workers))
def init_worker():
get_worker()._stuff = 0
return "OK"
# Call init_worker on each worker. Need pure to
# ensure this happens multiple times
init_futures = [client.submit(init_worker, pure=False,
workers=[w])
for w in workers]
assert all(f.result() == "OK" for f in as_completed(init_futures))
A = da.arange(0, 20, chunks=(5,), dtype=np.float64)
def inc_worker(A):
w = get_worker()
w._stuff += 1
print("{}[{}]".format(w.name, w._stuff))
return A + w._stuff
def increment(A):
""" Call inc_worker """
from dask.base import tokenize
name = 'increment-' + tokenize(A)
dsk = { (name, i): (inc_worker, (A.name, i))
for n, i in A.dask.keys() }
dsk.update(A.dask)
return da.Array(dsk, name, A.chunks, A.dtype)
print(increment(A).compute())
print(increment(A).compute())
我想找到一些方法使提交给每个工作人员的 increment-*
任务依赖于提交给每个工作人员的 init-worker-*
任务。
换句话说,我想避免等待 init_futures
在客户端完成。这引入的问题是,虽然我们知道哪些 init-worker-*
任务与每个工人相关联,但没有明显的方法可以事先知道 increment-*
任务的工人关联。
一种可能的方法:
- 对于每个
inc_worker
调用,生成一个 local_client()
提交一个任务,其中 init-worker-*
in get_worker().data
作为依赖项。我不喜欢这样,因为开销似乎很大。
关于如何做到这一点有什么建议吗?
编辑 1:实际上这无需等待 init_futures
完成即可工作,大概是因为它们在任何 [=11] 之前提交给工作调度程序=] 任务被提交给工人。仍然感觉我在做一个可能并不总是正确的假设...
编辑 2:提到这 2 个步骤应该在不同的数据集上 运行 多次。
部分选项:
使用client.run
并等待。这与您上面的提交技巧一样,但更明确且痛苦更少。但是它确实会阻止,您说过您不想这样做。
client.run(init_worker)
使用 worker --preload 脚本在 worker 启动时 运行 任意代码。参见 http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization
cluster.start_worker(..., preload=['myscript.py'])
使init_worker
幂等(可以运行多次不影响)并且总是在inc_worker
内调用它
def init_worker():
if not hasattr(get_worker(), '_stuff'):
get_worker()._stuff = 0
def inc_worker(...):
init_worker(...)
... do other things ...
此外,我注意到您正在手动构建 dask.arrays。您可能想看看 x.map_blocks(my_func) 和 x.to_delayed/x.from_delayed
在分布式任务中,任务通过调度程序分布在集群节点上。我希望引入每个节点对提交给节点的任务的依赖性。简而言之,我要执行的计算操作需要:
- 将数据预加载到每个节点上的 GPU 上。
- 在每个节点上使用分块 dask 数组中的其他数据执行 GPU 计算。
我还想在不同的数据集上多次入队 (1) 和 (2)。
我尝试将其设置为一个最小示例:
from __future__ import print_function
import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
get_worker, as_completed)
import numpy as np
cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")
with cluster, Client(cluster.scheduler_address) as client:
workers = client.scheduler_info()['workers'].values()
workers = [v['name'] for v in workers]
print("Workers {}".format(workers))
def init_worker():
get_worker()._stuff = 0
return "OK"
# Call init_worker on each worker. Need pure to
# ensure this happens multiple times
init_futures = [client.submit(init_worker, pure=False,
workers=[w])
for w in workers]
assert all(f.result() == "OK" for f in as_completed(init_futures))
A = da.arange(0, 20, chunks=(5,), dtype=np.float64)
def inc_worker(A):
w = get_worker()
w._stuff += 1
print("{}[{}]".format(w.name, w._stuff))
return A + w._stuff
def increment(A):
""" Call inc_worker """
from dask.base import tokenize
name = 'increment-' + tokenize(A)
dsk = { (name, i): (inc_worker, (A.name, i))
for n, i in A.dask.keys() }
dsk.update(A.dask)
return da.Array(dsk, name, A.chunks, A.dtype)
print(increment(A).compute())
print(increment(A).compute())
我想找到一些方法使提交给每个工作人员的 increment-*
任务依赖于提交给每个工作人员的 init-worker-*
任务。
换句话说,我想避免等待 init_futures
在客户端完成。这引入的问题是,虽然我们知道哪些 init-worker-*
任务与每个工人相关联,但没有明显的方法可以事先知道 increment-*
任务的工人关联。
一种可能的方法:
- 对于每个
inc_worker
调用,生成一个local_client()
提交一个任务,其中init-worker-*
inget_worker().data
作为依赖项。我不喜欢这样,因为开销似乎很大。
关于如何做到这一点有什么建议吗?
编辑 1:实际上这无需等待 init_futures
完成即可工作,大概是因为它们在任何 [=11] 之前提交给工作调度程序=] 任务被提交给工人。仍然感觉我在做一个可能并不总是正确的假设...
编辑 2:提到这 2 个步骤应该在不同的数据集上 运行 多次。
部分选项:
使用
client.run
并等待。这与您上面的提交技巧一样,但更明确且痛苦更少。但是它确实会阻止,您说过您不想这样做。client.run(init_worker)
使用 worker --preload 脚本在 worker 启动时 运行 任意代码。参见 http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization
cluster.start_worker(..., preload=['myscript.py'])
使
内调用它init_worker
幂等(可以运行多次不影响)并且总是在inc_worker
def init_worker(): if not hasattr(get_worker(), '_stuff'): get_worker()._stuff = 0 def inc_worker(...): init_worker(...) ... do other things ...
此外,我注意到您正在手动构建 dask.arrays。您可能想看看 x.map_blocks(my_func) 和 x.to_delayed/x.from_delayed