子类化 multiprocessing.queues.Queue
Subclassing multiprocessing.queues.Queue
我以前有这个工作,但在最近的完整 os 更新之后,可能更新了我的 python3 安装,我现在工作的不是。我基本上是在尝试子 class multiprocessing.queue.Queue
class,但我不断收到缺少方法和属性的错误。以下是过去的工作方式:
class q_class(multiprocessing.queues.Queue):
def __init__(self):
# ... my own init stuff goes here
super(q_class, self).__init__(ctx=multiprocessing.get_context())
def put(self,message):
# ... my own put stuff goes here
super(q_class, self).put(message)
def get(self):
# ... my own get stuff goes here
super(q_class, self).get()
def __getstate__(self):
multiprocessing.context.assert_spawning(self) #is this necessary?
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self.report_bottlenecks, self.bottleneck_time, self.debug)
def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self.report_bottlenecks, self.bottleneck_time,self.debug) = state
#self._after_fork() #is this necessary?
但是当我尝试使用 .get()
方法时,我得到一个错误,提示 _poll()
方法不存在。查看多处理源中的 SimpleQueue subclass creation ,我发现至少有必要显式定义 _poll()
方法,所以我尝试了(只是将上面链接的行复制到我的),然后得到关于 _poll()
需要超时值的错误,这很奇怪,因为我认为 reader class 的 _poll()
默认为 timeout=None
,但是不管怎样,我通过定义自定义方法解决了这个问题:
def _poll(self,timeout=.01):
self._reader._poll(timeout)
这可以克服超时错误,只是现在当我尝试将消息放入队列时,我收到一个错误,指出没有 _closed
属性。再次查看 multiprocessing.queues.Queue
source,当在其 __init__()
中调用 self._reset()
时,应该创建此属性,这应该在我的子 class' __init__()
,但显然它没有,所以我现在想我必须简单地误解了如何 subclass 首先(或者,至少,我的理解已经成为过时)。任何帮助将不胜感激!
嗯,你看到最后一行被注释掉了吗?
#self._after_fork() #is this necessary?
是的,这是必要的。取消注释使代码工作。
我以前有这个工作,但在最近的完整 os 更新之后,可能更新了我的 python3 安装,我现在工作的不是。我基本上是在尝试子 class multiprocessing.queue.Queue
class,但我不断收到缺少方法和属性的错误。以下是过去的工作方式:
class q_class(multiprocessing.queues.Queue):
def __init__(self):
# ... my own init stuff goes here
super(q_class, self).__init__(ctx=multiprocessing.get_context())
def put(self,message):
# ... my own put stuff goes here
super(q_class, self).put(message)
def get(self):
# ... my own get stuff goes here
super(q_class, self).get()
def __getstate__(self):
multiprocessing.context.assert_spawning(self) #is this necessary?
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self.report_bottlenecks, self.bottleneck_time, self.debug)
def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self.report_bottlenecks, self.bottleneck_time,self.debug) = state
#self._after_fork() #is this necessary?
但是当我尝试使用 .get()
方法时,我得到一个错误,提示 _poll()
方法不存在。查看多处理源中的 SimpleQueue subclass creation ,我发现至少有必要显式定义 _poll()
方法,所以我尝试了(只是将上面链接的行复制到我的),然后得到关于 _poll()
需要超时值的错误,这很奇怪,因为我认为 reader class 的 _poll()
默认为 timeout=None
,但是不管怎样,我通过定义自定义方法解决了这个问题:
def _poll(self,timeout=.01):
self._reader._poll(timeout)
这可以克服超时错误,只是现在当我尝试将消息放入队列时,我收到一个错误,指出没有 _closed
属性。再次查看 multiprocessing.queues.Queue
source,当在其 __init__()
中调用 self._reset()
时,应该创建此属性,这应该在我的子 class' __init__()
,但显然它没有,所以我现在想我必须简单地误解了如何 subclass 首先(或者,至少,我的理解已经成为过时)。任何帮助将不胜感激!
嗯,你看到最后一行被注释掉了吗?
#self._after_fork() #is this necessary?
是的,这是必要的。取消注释使代码工作。