子类化 multiprocessing.queues.Queue

Subclassing multiprocessing.queues.Queue

我以前有这个工作,但在最近的完整 os 更新之后,可能更新了我的 python3 安装,我现在工作的不是。我基本上是在尝试子 class multiprocessing.queue.Queue class,但我不断收到缺少方法和属性的错误。以下是过去的工作方式:

class q_class(multiprocessing.queues.Queue):
    def __init__(self):
        # ... my own init stuff goes here
        super(q_class, self).__init__(ctx=multiprocessing.get_context())
    def put(self,message):
        # ... my own put stuff goes here
        super(q_class, self).put(message)
    def get(self):
        # ... my own get stuff goes here
        super(q_class, self).get()
    def __getstate__(self):
        multiprocessing.context.assert_spawning(self) #is this necessary?
        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                self._rlock, self._wlock, self._sem, self._opid,
                self.report_bottlenecks, self.bottleneck_time, self.debug)
    def __setstate__(self, state):
        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
         self._rlock, self._wlock, self._sem, self._opid,
         self.report_bottlenecks, self.bottleneck_time,self.debug) = state
        #self._after_fork() #is this necessary?

但是当我尝试使用 .get() 方法时,我得到一个错误,提示 _poll() 方法不存在。查看多处理源中的 SimpleQueue subclass creation ,我发现至少有必要显式定义 _poll() 方法,所以我尝试了(只是将上面链接的行复制到我的),然后得到关于 _poll() 需要超时值的错误,这很奇怪,因为我认为 reader class 的 _poll() 默认为 timeout=None,但是不管怎样,我通过定义自定义方法解决了这个问题:

    def _poll(self,timeout=.01):
        self._reader._poll(timeout)

这可以克服超时错误,只是现在当我尝试将消息放入队列时,我收到一个错误,指出没有 _closed 属性。再次查看 multiprocessing.queues.Queue source,当在其 __init__() 中调用 self._reset() 时,应该创建此属性,这应该在我的子 class' __init__(),但显然它没有,所以我现在想我必须简单地误解了如何 subclass 首先(或者,至少,我的理解已经成为过时)。任何帮助将不胜感激!

嗯,你看到最后一行被注释掉了吗?

        #self._after_fork() #is this necessary?

是的,这是必要的。取消注释使代码工作。