python 中的嵌套 dask 工作流?
nested dask workflows in python?
我有一个简单的 dask 工作流程。当我打印出来时,它看起来像这样:
workflow = {
'a_task': (<function a_func at 0x7f1dc5ded598>,),
'b_task': (<function b_func at 0x7f1dc5ded620>,),
'c_task': (<function c_func at 0x7f1ddb07aea0>,),
'd_task': (<function d_func at 0x7f1dc5ded6a8>,),
'workflow_end_task': (<function aggregate_func at 0x7f1dc5ded730>,
'a_task', 'b_task', 'c_task', 'd_task')
}
原来 b_func
是一个带有 for 循环的小函数,它执行大约 1000 次迭代,大约需要一个小时才能完成。它基本上是这样的:
def b_func(args...):
data = []
for this in that:
data.append(...)
return data
不过,for 循环不必按顺序执行。它可以并行完成。
所以问题是:我该如何处理?我是否应该将该 for 循环转换为工作流并在 b_func
内再次调用 dask?还是我应该把这个过程拉出来扩展原来的工作流程?
基本上,我可以嵌套 dask 工作流还是一个坏主意?
此外,您应该知道我正在使用 from dask.distributed import Client
和 Client.get
以便在整个计算机集群之间分配工作流。我不知道这是否会使 dask.threaded.get
以外的事情变得复杂,但也许会有所作为。我想这意味着其中一个 dask workers
必须在集群的所有机器上设置一个新的调度程序和工作程序,然后将它的工作流传递给它们。也许吧,idk.
有没有人处理过这个问题?
Should I convert that for loop to a workflow and put another call to dask inside of b_func? or should I pull this process out and expand the original workflow?
Basically, can I nest dask workflows or is that a bad idea?
在一般情况下,不,你不应该让任务在 dask 中也调用 compute
。但是,您 可以 使用分布式调度程序执行此操作,并且一切正常。如果在任务中调用 compute 时没有指定调度器,则将使用当前调度器。缺点是提交任务('b_task' 在你的情况下)仍然会一直被阻塞,这会占用一个工作线程(效率较低)。
在你的情况下,我会使用 dask.delayed
(http://docs.dask.org/en/latest/delayed.html) 预先构建整个图表。这允许您编写循环正常的 Python 代码,并让 dask 为您构建图表。有关详细信息,请参阅延迟文档。
我有一个简单的 dask 工作流程。当我打印出来时,它看起来像这样:
workflow = {
'a_task': (<function a_func at 0x7f1dc5ded598>,),
'b_task': (<function b_func at 0x7f1dc5ded620>,),
'c_task': (<function c_func at 0x7f1ddb07aea0>,),
'd_task': (<function d_func at 0x7f1dc5ded6a8>,),
'workflow_end_task': (<function aggregate_func at 0x7f1dc5ded730>,
'a_task', 'b_task', 'c_task', 'd_task')
}
原来 b_func
是一个带有 for 循环的小函数,它执行大约 1000 次迭代,大约需要一个小时才能完成。它基本上是这样的:
def b_func(args...):
data = []
for this in that:
data.append(...)
return data
不过,for 循环不必按顺序执行。它可以并行完成。
所以问题是:我该如何处理?我是否应该将该 for 循环转换为工作流并在 b_func
内再次调用 dask?还是我应该把这个过程拉出来扩展原来的工作流程?
基本上,我可以嵌套 dask 工作流还是一个坏主意?
此外,您应该知道我正在使用 from dask.distributed import Client
和 Client.get
以便在整个计算机集群之间分配工作流。我不知道这是否会使 dask.threaded.get
以外的事情变得复杂,但也许会有所作为。我想这意味着其中一个 dask workers
必须在集群的所有机器上设置一个新的调度程序和工作程序,然后将它的工作流传递给它们。也许吧,idk.
有没有人处理过这个问题?
Should I convert that for loop to a workflow and put another call to dask inside of b_func? or should I pull this process out and expand the original workflow?
Basically, can I nest dask workflows or is that a bad idea?
在一般情况下,不,你不应该让任务在 dask 中也调用 compute
。但是,您 可以 使用分布式调度程序执行此操作,并且一切正常。如果在任务中调用 compute 时没有指定调度器,则将使用当前调度器。缺点是提交任务('b_task' 在你的情况下)仍然会一直被阻塞,这会占用一个工作线程(效率较低)。
在你的情况下,我会使用 dask.delayed
(http://docs.dask.org/en/latest/delayed.html) 预先构建整个图表。这允许您编写循环正常的 Python 代码,并让 dask 为您构建图表。有关详细信息,请参阅延迟文档。