我可以使用从 Dask/Distributed 中的 .py 文件导入的函数吗?

Can I use functions imported from .py files in Dask/Distributed?

我对序列化和导入有疑问。

In [1]: from distributed import Executor

In [2]: e = Executor('127.0.0.1:8786')

In [3]: e
Out[3]: <Executor: scheduler="127.0.0.1:8786" processes=2 cores=2>

In [4]: import socket

In [5]: e.run(socket.gethostname)
Out[5]: {'172.20.12.7:53405': 'n1015', '172.20.12.8:53779': 'n1016'}

In [6]: %%file mod.py
   ...: def hostname():
   ...:     return 'the hostname'
   ...: 
Overwriting mod.py

In [7]: import mod

In [8]: mod.hostname()
Out[8]: 'the hostname'

In [9]: e.run(mod.hostname)
distributed.utils - ERROR - No module named 'mod'

快速回答

将您的 mod.py 文件上传给所有员工。您可以使用您用来设置 dask.distributed 的任何机制来执行此操作,或者您可以使用 upload_file 方法

e.upload_file('mod.py')

或者,如果您的函数是在 IPython 中创建的,而不是作为模块的一部分,它将毫无问题地一起发送。

长答案

这都与函数如何在 Python 中序列化有关。来自模块的函数通过它们的模块名称和函数名称序列化

In [1]: from math import sin

In [2]: import pickle

In [3]: pickle.dumps(sin)
Out[3]: b'\x80\x03cmath\nsin\nq\x00.'

因此,如果客户端机器想要引用 math.sin 函数,它会沿着这个字节串发送(您会注意到其中有 'math''sin' 埋在其他字节中) 到工作机。工作人员看着这个字节串说:“好的,很好,我想要的功能在某某模块中,让我去本地文件系统中找到它。如果该模块不存在,则会引发错误,就像您上面收到的一样。

对于动态创建的函数(您在 IPython 中创建的函数),它使用完全不同的方法,将所有代码捆绑在一起。这种方法通常效果很好。

一般来说,Dask 假设工作人员和客户端都具有相同的软件环境。通常,这主要由设置集群的人使用 Docker 等其他工具来处理。当您有更频繁更新的文件或脚本时,像 upload_file 这样的方法可以填补空白。

对于 运行 集群上导入的函数在工作环境中不可用,您还可以从导入函数创建本地函数。然后这个本地函数将被 cloudpickle 腌制。在 Python 2 中,您可以使用 new.function 实现此目的(参见 new module). For Python 3 this could be achieved with the types module,但我还没有尝试过。

你上面的例子看起来像:

In [3]: import mod

In [4]: import new

In [5]: def remote(func):
   ...:     return new.function(func.func_code, func.func_globals, closure=func.func_closure)
   ...:

In [6]: e.run(remote(mod.hostname))
Out[6]: {'tcp://10.0.2.15:44208': 'the hostname'}

将模块目录添加到 PYTHONPATH 对我有用