python 中的嵌套 dask 工作流?

nested dask workflows in python?

我有一个简单的 dask 工作流程。当我打印出来时,它看起来像这样:

workflow = {
    'a_task': (<function a_func at 0x7f1dc5ded598>,), 
    'b_task': (<function b_func at 0x7f1dc5ded620>,), 
    'c_task': (<function c_func at 0x7f1ddb07aea0>,), 
    'd_task': (<function d_func at 0x7f1dc5ded6a8>,), 
    'workflow_end_task': (<function aggregate_func at 0x7f1dc5ded730>, 
        'a_task', 'b_task', 'c_task', 'd_task')
}

原来 b_func 是一个带有 for 循环的小函数,它执行大约 1000 次迭代,大约需要一个小时才能完成。它基本上是这样的:

def b_func(args...):
    data = []
    for this in that:
        data.append(...)
    return data

不过,for 循环不必按顺序执行。它可以并行完成。

所以问题是:我该如何处理?我是否应该将该 for 循环转换为工作流并在 b_func 内再次调用 dask?还是我应该把这个过程拉出来扩展原来的工作流程?

基本上,我可以嵌套 dask 工作流还是一个坏主意?

此外,您应该知道我正在使用 from dask.distributed import ClientClient.get 以便在整个计算机集群之间分配工作流。我不知道这是否会使 dask.threaded.get 以外的事情变得复杂,但也许会有所作为。我想这意味着其中一个 dask workers 必须在集群的所有机器上设置一个新的调度程序和工作程序,然后将它的工作流传递给它们。也许吧,idk.

有没有人处理过这个问题?

Should I convert that for loop to a workflow and put another call to dask inside of b_func? or should I pull this process out and expand the original workflow?

Basically, can I nest dask workflows or is that a bad idea?

在一般情况下,不,你不应该让任务在 dask 中也调用 compute。但是,您 可以 使用分布式调度程序执行此操作,并且一切正常。如果在任务中调用 compute 时没有指定调度器,则将使用当前调度器。缺点是提交任务('b_task' 在你的情况下)仍然会一直被阻塞,这会占用一个工作线程(效率较低)。

在你的情况下,我会使用 dask.delayed (http://docs.dask.org/en/latest/delayed.html) 预先构建整个图表。这允许您编写循环正常的 Python 代码,并让 dask 为您构建图表。有关详细信息,请参阅延迟文档。