如何在自定义 dask 图中调用 Executor.map?
How to call Executor.map in custom dask graph?
我有一个计算,由 3 "map" 个步骤组成,最后一步取决于前两个步骤的结果。我在多台 PC 上使用 dask.distributed
运行 执行此任务。
依赖图如下所示。
map(func1, list1) -> res_list1-\
| -> create_list_3(res_list1, res_list2)-> list3 -> map(func3, list3)
map(func2, list2) -> res_list2-/
如果我们想象这些计算是独立的,那么调用map
函数3次就很简单了。
from distributed import Executor, progress
def process(jobid):
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
futures.append(e.map(func1, list1))
futures.append(e.map(func2, list2))
futures.append(e.map(func3, list3))
return futures
if __name__ == '__main__':
jobid = 'blah-blah-blah'
r = process(jobid)
progress(r)
然而,list3
是由 func1
和 func2
的结果构建的,它的创建并不容易 map
pable (list1
, list2
、res_list1
和 res_list2
存储在 Postgresql 数据库中,list3
的创建是一个 JOIN
查询,需要一些时间)。
我试图将对 submit
的调用添加到期货列表中,但是,这没有按预期工作:
def process(jobid):
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
futures.append(e.map(func1, list1))
futures.append(e.map(func2, list2))
futures.append(e.submit(create_list_3))
futures.append(e.map(func3, list3))
return futures
在这种情况下,一个 dask-worker
收到了要执行的任务 create_list_3
,但其他人同时收到了要调用 func3
的任务,这就出错了,因为 list3
不存在。
很明显 - 我缺少同步。工人必须停下来等待 list3
的创建完成。
dask
的文档描述了可以提供同步的自定义任务图。
但是,文档中的示例不包括 map
函数,仅包括简单的计算,例如调用 add
和 inc
。
是否可以在我的案例中使用 map
和自定义 dask 图形,或者我应该使用 dask
中未包含的其他方式实现同步?
如果你想 link 任务之间的依赖关系,那么你应该将先前任务的输出传递给另一个任务的输入。
futures1 = e.map(func1, list1)
futures2 = e.map(func2, list2)
futures3 = e.map(func3, futures1, futures2)
对于 func3
的任何调用,Dask 将处理等待,直到输入准备就绪,并将适当的结果从计算的任何地方发送到该函数。
不过,您似乎想通过其他一些自定义方式处理数据传输和同步。如果是这样,那么将一些标记传递给对 func3
的调用可能会很有用。
futures1 = e.map(func1, list1)
futures2 = e.map(func2, list2)
def do_nothing(*args):
return None
token1 = e.submit(do_nothing, futures1)
token2 = e.submit(do_nothing, futures2)
list3 = e.submit(create_list_3)
def func3(arg, tokens=None):
...
futures3 = e.map(func3, list3, tokens=[token1, token2])
这有点 hack,但会强制所有 func3
函数等待,直到它们能够从之前的地图调用中获取令牌结果。
不过,我建议尝试做类似于第一个选项的事情。这将使 dask 在运行和释放资源方面变得更加智能。 token1/2
等障碍会导致次优调度。
我有一个计算,由 3 "map" 个步骤组成,最后一步取决于前两个步骤的结果。我在多台 PC 上使用 dask.distributed
运行 执行此任务。
依赖图如下所示。
map(func1, list1) -> res_list1-\
| -> create_list_3(res_list1, res_list2)-> list3 -> map(func3, list3)
map(func2, list2) -> res_list2-/
如果我们想象这些计算是独立的,那么调用map
函数3次就很简单了。
from distributed import Executor, progress
def process(jobid):
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
futures.append(e.map(func1, list1))
futures.append(e.map(func2, list2))
futures.append(e.map(func3, list3))
return futures
if __name__ == '__main__':
jobid = 'blah-blah-blah'
r = process(jobid)
progress(r)
然而,list3
是由 func1
和 func2
的结果构建的,它的创建并不容易 map
pable (list1
, list2
、res_list1
和 res_list2
存储在 Postgresql 数据库中,list3
的创建是一个 JOIN
查询,需要一些时间)。
我试图将对 submit
的调用添加到期货列表中,但是,这没有按预期工作:
def process(jobid):
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
futures.append(e.map(func1, list1))
futures.append(e.map(func2, list2))
futures.append(e.submit(create_list_3))
futures.append(e.map(func3, list3))
return futures
在这种情况下,一个 dask-worker
收到了要执行的任务 create_list_3
,但其他人同时收到了要调用 func3
的任务,这就出错了,因为 list3
不存在。
很明显 - 我缺少同步。工人必须停下来等待 list3
的创建完成。
dask
的文档描述了可以提供同步的自定义任务图。
但是,文档中的示例不包括 map
函数,仅包括简单的计算,例如调用 add
和 inc
。
是否可以在我的案例中使用 map
和自定义 dask 图形,或者我应该使用 dask
中未包含的其他方式实现同步?
如果你想 link 任务之间的依赖关系,那么你应该将先前任务的输出传递给另一个任务的输入。
futures1 = e.map(func1, list1)
futures2 = e.map(func2, list2)
futures3 = e.map(func3, futures1, futures2)
对于 func3
的任何调用,Dask 将处理等待,直到输入准备就绪,并将适当的结果从计算的任何地方发送到该函数。
不过,您似乎想通过其他一些自定义方式处理数据传输和同步。如果是这样,那么将一些标记传递给对 func3
的调用可能会很有用。
futures1 = e.map(func1, list1)
futures2 = e.map(func2, list2)
def do_nothing(*args):
return None
token1 = e.submit(do_nothing, futures1)
token2 = e.submit(do_nothing, futures2)
list3 = e.submit(create_list_3)
def func3(arg, tokens=None):
...
futures3 = e.map(func3, list3, tokens=[token1, token2])
这有点 hack,但会强制所有 func3
函数等待,直到它们能够从之前的地图调用中获取令牌结果。
不过,我建议尝试做类似于第一个选项的事情。这将使 dask 在运行和释放资源方面变得更加智能。 token1/2
等障碍会导致次优调度。