我可以在用 ctypes 包装的函数上使用 dask.delayed 吗?

Can I use dask.delayed on a function wrapped with ctypes?

目标是使用 dask.delayed 并行化我的代码的某些 'embarrassingly parallel' 部分。该代码涉及调用一个 python 函数,该函数使用 ctypes 包装一个 c 函数。为了理解我遇到的错误,我写了一个非常基本的例子。

c 函数:

double zippy_sum(double x, double y)
{
return x + y;
}

python:

from dask.distributed import Client
client = Client(n_workers = 4)
client

import os
import dask
import ctypes

current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double

def zippy(x, y):

    z = _zippy_sum(x, y)

    return z

result = dask.delayed(zippy)(1., 2.)
result.compute()

回溯:

--------------------------------------------------------------------------- KeyError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func) 3286 with _cache_lock: -> 3287 result = cache_dumps[func] 3288 except KeyError:

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py in getitem(self, key) 1517 def getitem(self, key): -> 1518 value = super().getitem(key) 1519 self.data.move_to_end(key)

~/.edm/envs/evaxi3.6/lib/python3.6/collections/init.py in getitem(self, key) 990 return self.class.missing(self, key) --> 991 raise KeyError(key) 992 def setitem(self, key, item): self.data[key] = item

KeyError: function zippy at 0x11ffc50d0

During handling of the above exception, another exception occurred:

ValueError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x) 40 if b"main" in result: ---> 41 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 42 else:

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) 1147 cp = CloudPickler(file, protocol=protocol) -> 1148 cp.dump(obj) 1149 return file.getvalue()

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 490 try: --> 491 return Pickler.dump(self, obj) 492 except RuntimeError as e:

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in dump(self, obj) 408 self.framer.start_framing() --> 409 self.save(obj) 410 self.write(STOP)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 565 else: --> 566 return self.save_function_tuple(obj) 567

~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 779 state['kwdefaults'] = func.kwdefaults --> 780 save(state) 781 write(pickle.TUPLE)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 846 save(k) --> 847 save(v) 848 write(SETITEMS)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 851 save(k) --> 852 save(v) 853 write(SETITEM)

~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 495 if reduce is not None: --> 496 rv = reduce(self.proto) 497 else:

ValueError: ctypes objects containing pointers cannot be pickled

很遗憾,我的错误还是没看懂!我刚刚开始使用 dask,对 ctypes 只有一些基本经验。有没有人对如何解决这个问题有建议,甚至了解需要解决的问题?

谢谢!

确实,您不能序列化在闭包或参数中引用 C 函数的函数。但是,如果您的函数位于所有工作人员都可以访问的模块中,那么您最终只会序列化模块名称,并且 python 做正确的事情。

module zippy.py(在您的 python PATH 中的某处,也许是示例的当前目录):

import os
import dask
import ctypes

current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double

def zippy(x, y):

    z = _zippy_sum(x, y)

    return z

主脚本:

from dask.distributed import Client
import zippy
if __name__ == "__main__":
    # if running as a script, this is helpful
    client = Client(n_workers = 4)

result = dask.delayed(zippy.zippy)(1., 2.)
result.compute()

如果您不想制作模块,另一种解决方案是在函数内完成所有 C 导入和定义。

def zippy(x, y):
    _mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

    _zippy_sum = _mod.zippy_sum
    _zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
    _zippy_sum.restype = ctypes.c_double

    z = _zippy_sum(x, y)

    return z