如何将延迟函数调用的顺序执行和并行执行结合起来?
How can I combine sequential as well as parallel execution of delayed function calls?
我被困在一个陌生的地方。我有一堆要按特定顺序执行的延迟函数调用。虽然并行执行很简单:
res = client.compute([myfuncs])
res = client.gather(res)
我似乎找不到以非阻塞方式按顺序执行它们的方法。
这是一个最小的例子:
import numpy as np
from time import sleep
from datetime import datetime
from dask import delayed
from dask.distributed import LocalCluster, Client
@delayed
def dosomething(name):
res = {"name": name, "beg": datetime.now()}
sleep(np.random.randint(10))
res.update(rand=np.random.rand())
res.update(end=datetime.now())
return res
seq1 = [dosomething(name) for name in ["foo", "bar", "baz"]]
par1 = dosomething("whaat")
par2 = dosomething("ahem")
pipeline = [seq1, par1, par2]
鉴于上面的例子,我想 运行 seq1
、par1
和 par2
并行,但是 seq1
的组成部分: "foo"、"bar" 和 "baz",依次。
你绝对可以作弊并为你的函数添加一个可选的依赖项,如下所示:
@dask.delayed
def dosomething(name, *args):
...
这样您就可以使任务相互依赖,即使您不在函数的下一个 运行 中使用一个结果:
inputs = ["foo", "bar", "baz"]
seq1 = [dosomething(inputs[0])]
for bit in inputs[1:]:
seq1.append(dosomething(bit, seq1[-1]))
或者,您可以阅读分布式调度程序的 "futures" 界面,通过它您可以实时监控任务的进度。
我被困在一个陌生的地方。我有一堆要按特定顺序执行的延迟函数调用。虽然并行执行很简单:
res = client.compute([myfuncs])
res = client.gather(res)
我似乎找不到以非阻塞方式按顺序执行它们的方法。
这是一个最小的例子:
import numpy as np
from time import sleep
from datetime import datetime
from dask import delayed
from dask.distributed import LocalCluster, Client
@delayed
def dosomething(name):
res = {"name": name, "beg": datetime.now()}
sleep(np.random.randint(10))
res.update(rand=np.random.rand())
res.update(end=datetime.now())
return res
seq1 = [dosomething(name) for name in ["foo", "bar", "baz"]]
par1 = dosomething("whaat")
par2 = dosomething("ahem")
pipeline = [seq1, par1, par2]
鉴于上面的例子,我想 运行 seq1
、par1
和 par2
并行,但是 seq1
的组成部分: "foo"、"bar" 和 "baz",依次。
你绝对可以作弊并为你的函数添加一个可选的依赖项,如下所示:
@dask.delayed
def dosomething(name, *args):
...
这样您就可以使任务相互依赖,即使您不在函数的下一个 运行 中使用一个结果:
inputs = ["foo", "bar", "baz"]
seq1 = [dosomething(inputs[0])]
for bit in inputs[1:]:
seq1.append(dosomething(bit, seq1[-1]))
或者,您可以阅读分布式调度程序的 "futures" 界面,通过它您可以实时监控任务的进度。