如何从父(消费者)进程关闭多处理队列
How to close a multiprocessing queue from a parent (consumer) process
在使用多进程队列进行进程间通信的地方,许多文章建议向队列发送终止消息。
但是,如果子进程是生产者,则可能会按预期失败,从而使消费者得不到通知并等待更多消息。
但是,如果子进程死亡,父进程可以得到通知。
它似乎应该可以通知此进程中的工作线程退出并且不期望更多消息。但是怎么办?
multiprocessing.Queue.close()
...不通知消费者(真的吗?等等?什么!)
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
...不允许我等待未完成的工作完成。
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
# messageQ.close()
messageQ.join_thread() # Wait for worker to complete
...失败,因为队列尚未关闭。
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
messageQ.close()
messageQ.join_thread() # Wait for worker to complete
... 似乎应该可以工作,但在 worker 中失败并出现 TypeError 异常:
msg = messageQ.get()
File "/usr/lib/python3.7/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
return self._recv(size)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
while !quit:
try:
msg = messageQ.get(block=True, timeout=0.5)
except Empty:
continue
... 很糟糕,因为它不必要地要求交易关闭延迟而不限制 CPU。
完整示例
import multiprocessing
import threading
def producer(messageQ):
messageQ.put("1")
messageQ.put("2")
messageQ.put("3")
if __name__ == '__main__':
messageQ = multiprocessing.Queue()
def worker():
try:
while True:
msg = messageQ.get()
print(msg)
if msg=="TERMINATE": return
# messageQ.task_done()
finally:
print("Worker quit")
# messageQ.close() # End thread
# messageQ.join_thread()
thr = threading.Thread(target=worker,
daemon=False) # The work queue is precious.
thr.start()
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE") # Notify worker we are done
messageQ.close() # No more messages
messageQ.join_thread() # Wait for worker to complete
def runProcess():
proc = multiprocessing.Process(target=producer, args=(messageQ,))
proc.start()
proc.join()
print("runProcess quitting ...")
onProcessQuit()
print("runProcess quitting .. OK")
runProcess()
如果您担心 producer
进程未正常完成,那么我不确定您的问题是什么,因为除了一些更正外,您的代码应该可以正常工作:(1) 它缺少一个import
语句,(2) 没有调用 runProcess
和 (3) 你的工作线程不正确地是一个 daemon 线程(因此它可能最终终止在它有机会处理队列中的所有消息之前)。
我也会使用 None
作为个人偏好(而不是更正)作为特殊的 sentinel 消息而不是 TERMINATE
并删除一些无关的您并不真正需要的队列调用(我没有看到您明确关闭队列来完成任何必要的事情)。
这些是变化:
def producer(messageQ):
messageQ.put("1")
messageQ.put("2")
messageQ.put("3")
if __name__ == '__main__':
import multiprocessing
import threading
SENTINEL = None
def worker():
try:
while True:
msg = messageQ.get()
if msg is SENTINEL:
return # No need to print the sentinel
print(msg)
finally:
print("Worker quit")
def onProcessQuit(): # Notify worker that we are done.
messageQ.put(SENTINEL) # Notify worker we are done
def runProcess():
proc = multiprocessing.Process(target=producer, args=(messageQ,))
proc.start()
proc.join()
print("runProcess quitting ...")
onProcessQuit()
print("runProcess quitting .. OK")
thr.join()
messageQ = multiprocessing.Queue()
thr = threading.Thread(target=worker) # The work queue is precious.
thr.start()
runProcess()
打印:
1
2
3
runProcess quitting ...
runProcess quitting .. OK
Worker quit
在使用多进程队列进行进程间通信的地方,许多文章建议向队列发送终止消息。 但是,如果子进程是生产者,则可能会按预期失败,从而使消费者得不到通知并等待更多消息。
但是,如果子进程死亡,父进程可以得到通知。 它似乎应该可以通知此进程中的工作线程退出并且不期望更多消息。但是怎么办?
multiprocessing.Queue.close()
...不通知消费者(真的吗?等等?什么!)
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
...不允许我等待未完成的工作完成。
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
# messageQ.close()
messageQ.join_thread() # Wait for worker to complete
...失败,因为队列尚未关闭。
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
messageQ.close()
messageQ.join_thread() # Wait for worker to complete
... 似乎应该可以工作,但在 worker 中失败并出现 TypeError 异常:
msg = messageQ.get()
File "/usr/lib/python3.7/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
return self._recv(size)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
while !quit:
try:
msg = messageQ.get(block=True, timeout=0.5)
except Empty:
continue
... 很糟糕,因为它不必要地要求交易关闭延迟而不限制 CPU。
完整示例
import multiprocessing
import threading
def producer(messageQ):
messageQ.put("1")
messageQ.put("2")
messageQ.put("3")
if __name__ == '__main__':
messageQ = multiprocessing.Queue()
def worker():
try:
while True:
msg = messageQ.get()
print(msg)
if msg=="TERMINATE": return
# messageQ.task_done()
finally:
print("Worker quit")
# messageQ.close() # End thread
# messageQ.join_thread()
thr = threading.Thread(target=worker,
daemon=False) # The work queue is precious.
thr.start()
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE") # Notify worker we are done
messageQ.close() # No more messages
messageQ.join_thread() # Wait for worker to complete
def runProcess():
proc = multiprocessing.Process(target=producer, args=(messageQ,))
proc.start()
proc.join()
print("runProcess quitting ...")
onProcessQuit()
print("runProcess quitting .. OK")
runProcess()
如果您担心 producer
进程未正常完成,那么我不确定您的问题是什么,因为除了一些更正外,您的代码应该可以正常工作:(1) 它缺少一个import
语句,(2) 没有调用 runProcess
和 (3) 你的工作线程不正确地是一个 daemon 线程(因此它可能最终终止在它有机会处理队列中的所有消息之前)。
我也会使用 None
作为个人偏好(而不是更正)作为特殊的 sentinel 消息而不是 TERMINATE
并删除一些无关的您并不真正需要的队列调用(我没有看到您明确关闭队列来完成任何必要的事情)。
这些是变化:
def producer(messageQ):
messageQ.put("1")
messageQ.put("2")
messageQ.put("3")
if __name__ == '__main__':
import multiprocessing
import threading
SENTINEL = None
def worker():
try:
while True:
msg = messageQ.get()
if msg is SENTINEL:
return # No need to print the sentinel
print(msg)
finally:
print("Worker quit")
def onProcessQuit(): # Notify worker that we are done.
messageQ.put(SENTINEL) # Notify worker we are done
def runProcess():
proc = multiprocessing.Process(target=producer, args=(messageQ,))
proc.start()
proc.join()
print("runProcess quitting ...")
onProcessQuit()
print("runProcess quitting .. OK")
thr.join()
messageQ = multiprocessing.Queue()
thr = threading.Thread(target=worker) # The work queue is precious.
thr.start()
runProcess()
打印:
1
2
3
runProcess quitting ...
runProcess quitting .. OK
Worker quit