如何从父(消费者)进程关闭多处理队列

How to close a multiprocessing queue from a parent (consumer) process

在使用多进程队列进行进程间通信的地方,许多文章建议向队列发送终止消息。 但是,如果子进程是生产者,则可能会按预期失败,从而使消费者得不到通知并等待更多消息。

但是,如果子进程死亡,父进程可以得到通知。 它似乎应该可以通知此进程中的工作线程退出并且不期望更多消息。但是怎么办?

    multiprocessing.Queue.close()

...不通知消费者(真的吗?等等?什么!)

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")

...不允许我等待未完成的工作完成。

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")
        # messageQ.close()
        messageQ.join_thread()  # Wait for worker to complete

...失败,因为队列尚未关闭。

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")
        messageQ.close()
        messageQ.join_thread()  # Wait for worker to complete

... 似乎应该可以工作,但在 worker 中失败并出现 TypeError 异常:

    msg = messageQ.get()
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
    return self._recv(size)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
            while !quit:
                try:
                   msg = messageQ.get(block=True, timeout=0.5)
                except Empty:
                   continue

... 很糟糕,因为它不必要地要求交易关闭延迟而不限制 CPU。

完整示例

import multiprocessing
import threading


def producer(messageQ):
    messageQ.put("1")
    messageQ.put("2")
    messageQ.put("3")

if __name__ == '__main__':
    messageQ = multiprocessing.Queue()
    def worker():
        try:
            while True:
                msg = messageQ.get()
                print(msg)
                if msg=="TERMINATE": return
                # messageQ.task_done()
        finally:
            print("Worker quit")
            # messageQ.close()  # End thread
            # messageQ.join_thread()

    thr = threading.Thread(target=worker, 
                           daemon=False)   # The work queue is precious.
    thr.start()

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE") # Notify worker we are done
        messageQ.close()        # No more messages
        messageQ.join_thread()  # Wait for worker to complete

    def runProcess():
        proc = multiprocessing.Process(target=producer, args=(messageQ,))
        proc.start()
        proc.join()
        print("runProcess quitting ...")
        onProcessQuit()
        print("runProcess quitting .. OK")

    runProcess()

如果您担心 producer 进程未正常完成,那么我不确定您的问题是什么,因为除了一些更正外,您的代码应该可以正常工作:(1) 它缺少一个import 语句,(2) 没有调用 runProcess 和 (3) 你的工作线程不正确地是一个 daemon 线程(因此它可能最终终止在它有机会处理队列中的所有消息之前)。

我也会使用 None 作为个人偏好(而不是更正)作为特殊的 sentinel 消息而不是 TERMINATE 并删除一些无关的您并不真正需要的队列调用(我没有看到您明确关闭队列来完成任何必要的事情)。

这些是变化:

def producer(messageQ):
    messageQ.put("1")
    messageQ.put("2")
    messageQ.put("3")

if __name__ == '__main__':
    import multiprocessing
    import threading

    SENTINEL = None

    def worker():
        try:
            while True:
                msg = messageQ.get()
                if msg is SENTINEL:
                    return # No need to print the sentinel
                print(msg)
        finally:
            print("Worker quit")


    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put(SENTINEL) # Notify worker we are done

    def runProcess():
        proc = multiprocessing.Process(target=producer, args=(messageQ,))
        proc.start()
        proc.join()
        print("runProcess quitting ...")
        onProcessQuit()
        print("runProcess quitting .. OK")
        thr.join()

    messageQ = multiprocessing.Queue()
    thr = threading.Thread(target=worker)   # The work queue is precious.
    thr.start()
    runProcess()

打印:

1
2
3
runProcess quitting ...
runProcess quitting .. OK
Worker quit