为什么 dask 不并行化这个工作流程?
why doesn't dask parallelize this workflow?
我有两个非常简单的函数:
import time
def sleepy(a=1):
time.sleep(a)
print(a)
def ending(*args):
print(args)
print('finished')
我还有一个使用这些功能的 dask 工作流:
workflow = {'task_0': (sleepy, 1),
'task_1': (sleepy, 2),
'task_2': (sleepy, 3),
'ending': (ending, 'task_0', 'task_1', 'task_2')}
这个工作流可以这样形象化:
sleepy
、sleepy
、sleepy
,本应是运行并行,其实不然。
我等待 1 秒,它从 sleepy()
打印 1,然后我等待 2 秒,它打印 2,然后我再等 3 秒,它打印 3:
1
2
3
(None, None, None)
finished
我做错了什么?
这就是我编写您的工作流程的方式,睡眠操作确实是并行发生的
import dask.delayed
import time
@dask.delayed
def sleepy(a=1):
time.sleep(a)
print(a)
@dask.delayed
def ending(*args):
print(args)
print('finished')
d = ending(*[sleepy(i) for i in [1, 2, 3]])
d.compute()
请注意,@
装饰器只是为了语法美观,您还可以做 dask.delayed(sleepy)
等
将 dask.get(
更改为 dask.threaded.get(
解决了我的问题,但我也非常喜欢 mdurant 的回答。
我有两个非常简单的函数:
import time
def sleepy(a=1):
time.sleep(a)
print(a)
def ending(*args):
print(args)
print('finished')
我还有一个使用这些功能的 dask 工作流:
workflow = {'task_0': (sleepy, 1),
'task_1': (sleepy, 2),
'task_2': (sleepy, 3),
'ending': (ending, 'task_0', 'task_1', 'task_2')}
这个工作流可以这样形象化:
sleepy
、sleepy
、sleepy
,本应是运行并行,其实不然。
我等待 1 秒,它从 sleepy()
打印 1,然后我等待 2 秒,它打印 2,然后我再等 3 秒,它打印 3:
1
2
3
(None, None, None)
finished
我做错了什么?
这就是我编写您的工作流程的方式,睡眠操作确实是并行发生的
import dask.delayed
import time
@dask.delayed
def sleepy(a=1):
time.sleep(a)
print(a)
@dask.delayed
def ending(*args):
print(args)
print('finished')
d = ending(*[sleepy(i) for i in [1, 2, 3]])
d.compute()
请注意,@
装饰器只是为了语法美观,您还可以做 dask.delayed(sleepy)
等
将 dask.get(
更改为 dask.threaded.get(
解决了我的问题,但我也非常喜欢 mdurant 的回答。