为什么 dask 不并行化这个工作流程?

why doesn't dask parallelize this workflow?

我有两个非常简单的函数:

import time

def sleepy(a=1):
    time.sleep(a)
    print(a)

def ending(*args):
    print(args)
    print('finished')

我还有一个使用这些功能的 dask 工作流:

workflow = {'task_0': (sleepy, 1), 
            'task_1': (sleepy, 2), 
            'task_2': (sleepy, 3), 
            'ending': (ending, 'task_0', 'task_1', 'task_2')}

这个工作流可以这样形象化:

sleepysleepysleepy,本应是运行并行,其实不然。

我等待 1 秒,它从 sleepy() 打印 1,然后我等待 2 秒,它打印 2,然后我再等 3 秒,它打印 3:

1
2
3
(None, None, None)
finished

我做错了什么?

这就是我编写您的工作流程的方式,睡眠操作确实是并行发生的

import dask.delayed
import time

@dask.delayed
def sleepy(a=1):
    time.sleep(a)
    print(a)

@dask.delayed
def ending(*args):
    print(args)
    print('finished')

d = ending(*[sleepy(i) for i in [1, 2, 3]])
d.compute()

请注意,@ 装饰器只是为了语法美观,您还可以做 dask.delayed(sleepy)

dask.get( 更改为 dask.threaded.get( 解决了我的问题,但我也非常喜欢 mdurant 的回答。