我可以在用 ctypes 包装的函数上使用 dask.delayed 吗?
Can I use dask.delayed on a function wrapped with ctypes?
目标是使用 dask.delayed
并行化我的代码的某些 'embarrassingly parallel' 部分。该代码涉及调用一个 python 函数,该函数使用 ctypes
包装一个 c 函数。为了理解我遇到的错误,我写了一个非常基本的例子。
c 函数:
double zippy_sum(double x, double y)
{
return x + y;
}
python:
from dask.distributed import Client
client = Client(n_workers = 4)
client
import os
import dask
import ctypes
current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
def zippy(x, y):
z = _zippy_sum(x, y)
return z
result = dask.delayed(zippy)(1., 2.)
result.compute()
回溯:
--------------------------------------------------------------------------- KeyError Traceback (most recent call
last)
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/worker.py
in dumps_function(func) 3286 with _cache_lock:
-> 3287 result = cache_dumps[func] 3288 except KeyError:
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py
in getitem(self, key) 1517 def getitem(self, key):
-> 1518 value = super().getitem(key) 1519 self.data.move_to_end(key)
~/.edm/envs/evaxi3.6/lib/python3.6/collections/init.py in
getitem(self, key)
990 return self.class.missing(self, key)
--> 991 raise KeyError(key)
992 def setitem(self, key, item): self.data[key] = item
KeyError: function zippy at 0x11ffc50d0
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call
last)
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/protocol/pickle.py
in dumps(x)
40 if b"main" in result:
---> 41 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
42 else:
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py
in dumps(obj, protocol) 1147 cp = CloudPickler(file,
protocol=protocol)
-> 1148 cp.dump(obj) 1149 return file.getvalue()
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py
in dump(self, obj)
490 try:
--> 491 return Pickler.dump(self, obj)
492 except RuntimeError as e:
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in dump(self, obj)
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj,
save_persistent_id)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py
in save_function(self, obj, name)
565 else:
--> 566 return self.save_function_tuple(obj)
567
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py
in save_function_tuple(self, func)
779 state['kwdefaults'] = func.kwdefaults
--> 780 save(state)
781 write(pickle.TUPLE)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj,
save_persistent_id)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj)
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self,
items)
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj,
save_persistent_id)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj)
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self,
items)
851 save(k)
--> 852 save(v)
853 write(SETITEM)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj,
save_persistent_id)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
ValueError: ctypes objects containing pointers cannot be pickled
很遗憾,我的错误还是没看懂!我刚刚开始使用 dask
,对 ctypes
只有一些基本经验。有没有人对如何解决这个问题有建议,甚至了解需要解决的问题?
谢谢!
确实,您不能序列化在闭包或参数中引用 C 函数的函数。但是,如果您的函数位于所有工作人员都可以访问的模块中,那么您最终只会序列化模块名称,并且 python 做正确的事情。
module zippy.py(在您的 python PATH 中的某处,也许是示例的当前目录):
import os
import dask
import ctypes
current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
def zippy(x, y):
z = _zippy_sum(x, y)
return z
主脚本:
from dask.distributed import Client
import zippy
if __name__ == "__main__":
# if running as a script, this is helpful
client = Client(n_workers = 4)
result = dask.delayed(zippy.zippy)(1., 2.)
result.compute()
如果您不想制作模块,另一种解决方案是在函数内完成所有 C 导入和定义。
def zippy(x, y):
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
z = _zippy_sum(x, y)
return z
目标是使用 dask.delayed
并行化我的代码的某些 'embarrassingly parallel' 部分。该代码涉及调用一个 python 函数,该函数使用 ctypes
包装一个 c 函数。为了理解我遇到的错误,我写了一个非常基本的例子。
c 函数:
double zippy_sum(double x, double y)
{
return x + y;
}
python:
from dask.distributed import Client
client = Client(n_workers = 4)
client
import os
import dask
import ctypes
current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
def zippy(x, y):
z = _zippy_sum(x, y)
return z
result = dask.delayed(zippy)(1., 2.)
result.compute()
回溯:
--------------------------------------------------------------------------- KeyError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func) 3286 with _cache_lock: -> 3287 result = cache_dumps[func] 3288 except KeyError:
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/utils.py in getitem(self, key) 1517 def getitem(self, key): -> 1518 value = super().getitem(key) 1519 self.data.move_to_end(key)
~/.edm/envs/evaxi3.6/lib/python3.6/collections/init.py in getitem(self, key) 990 return self.class.missing(self, key) --> 991 raise KeyError(key) 992 def setitem(self, key, item): self.data[key] = item
KeyError: function zippy at 0x11ffc50d0
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last) ~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x) 40 if b"main" in result: ---> 41 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL) 42 else:
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) 1147 cp = CloudPickler(file, protocol=protocol) -> 1148 cp.dump(obj) 1149 return file.getvalue()
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) 490 try: --> 491 return Pickler.dump(self, obj) 492 except RuntimeError as e:
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in dump(self, obj) 408 self.framer.start_framing() --> 409 self.save(obj) 410 self.write(STOP)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name) 565 else: --> 566 return self.save_function_tuple(obj) 567
~/.edm/envs/evaxi3.6/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func) 779 state['kwdefaults'] = func.kwdefaults --> 780 save(state) 781 write(pickle.TUPLE)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 846 save(k) --> 847 save(v) 848 write(SETITEMS)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 475 if f is not None: --> 476 f(self, obj) # Call unbound method with explicit self 477 return
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save_dict(self, obj) 820 self.memoize(obj) --> 821 self._batch_setitems(obj.items()) 822
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in _batch_setitems(self, items) 851 save(k) --> 852 save(v) 853 write(SETITEM)
~/.edm/envs/evaxi3.6/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) 495 if reduce is not None: --> 496 rv = reduce(self.proto) 497 else:
ValueError: ctypes objects containing pointers cannot be pickled
很遗憾,我的错误还是没看懂!我刚刚开始使用 dask
,对 ctypes
只有一些基本经验。有没有人对如何解决这个问题有建议,甚至了解需要解决的问题?
谢谢!
确实,您不能序列化在闭包或参数中引用 C 函数的函数。但是,如果您的函数位于所有工作人员都可以访问的模块中,那么您最终只会序列化模块名称,并且 python 做正确的事情。
module zippy.py(在您的 python PATH 中的某处,也许是示例的当前目录):
import os
import dask
import ctypes
current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
def zippy(x, y):
z = _zippy_sum(x, y)
return z
主脚本:
from dask.distributed import Client
import zippy
if __name__ == "__main__":
# if running as a script, this is helpful
client = Client(n_workers = 4)
result = dask.delayed(zippy.zippy)(1., 2.)
result.compute()
如果您不想制作模块,另一种解决方案是在函数内完成所有 C 导入和定义。
def zippy(x, y):
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))
_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double
z = _zippy_sum(x, y)
return z