time.sleep() 方法必须在 notebook 和 dask 分布式中重新加载

time.sleep() method has to be reloaded in notebook and dask distributed

我正在做一个基本的 dask 分布式教程并注意到一个奇怪的行为。

from dask.distributed import Client
client = Client(cluster)  # Connect to cluster

然后在我的笔记本中,定义了如下函数

import time
def square(x):
    time.sleep(2)
    return x ** 2

def neg(x):
    time.sleep(2)
    return -x

使用以下单元启动作业

A = client.map(square, range(10))
B = client.map(neg, A)
total = client.submit(sum, B)
%time total.result()

第一次执行上述单元格时,一切正常 运行,挂起时间为 20 秒。 但是如果我重新运行这个cell,wall time只有几毫秒....

为了让事情 运行 再次正常,我必须重新 运行 导入时间、def square()、def neg() cell...

我不明白这种行为。

我理解必须关闭然后重新启动客户端,但重新定义这两个函数?

是的,情况有点微妙。

Dask 将 keys 分配给它跟踪的对象,在本例中为 futures ABtotal。它们的键取决于传递的函数和参数。此外,只要至少有一个未来指向它,未来就会保留在集群的内存中。所以当你重复

A = client.map(square, range(10))

(或者,比方说,第一个,看起来像 submit(square, 0)) Dask 将构建操作的密钥,并在提交时意识到这个确切的事情已经有一个结果,所以工作不会重复 - 它只会得到以前的结果。

请注意,保存先前未来的变量 A 的清理发生在之后 分配了新版本的 A,因此有始终是对未来 live 的引用,集群不会将其从内存中删除。

如果你重新定义函数,Dask 会给它们新的键,并且不会使用之前保存在集群中的结果。 Dask 实际上并不知道函数的 contents(没有改变),只是该函数与之前的对象不同(即,它位于不同的内存位置) .

最后说明:在笔记本中执行有时更难考虑,因为当您重新执行一个单元格或返回到前一个单元格时,您在屏幕上看到的输入顺序并不完全是口译员处理。 运行 代码文件 (.py) 中的相同内容可能具有指导意义。