当作业数超过 n 时,进程无法退出()
Processes fail to exit() when job count exceeds n
我是 运行 Python 2.7.2 (default, Jun 12 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] on win32
。
我生成 4 个进程,给它们 2 个队列 - 用于任务和结果,最后加入任务队列。当任务计数达到一定数量时 - 例如 njobs = 10000
- 一些子进程和主进程不会退出,即使所有任务都已完成。
这是为什么?
说明这一点的代码
def worker(job_queue, result_queue):
import Queue
while True:
try:
j = job_queue.get(False)
except Queue.Empty:
exit('done')
else:
result_queue.put_nowait(j)
job_queue.task_done()
if __name__ == "__main__":
from multiprocessing import JoinableQueue, Process, cpu_count
job_queue = JoinableQueue()
result_queue = JoinableQueue()
njobs = 10000
for i in xrange(njobs):
job_queue.put(i)
cpus = cpu_count()
for i in xrange(cpus):
p = Process(target=worker, args=(job_queue, result_queue))
p.start()
job_queue.join()
print("DONE")
并且任务越长,某个(或所有)进程挂起所需的任务数就越少。最初,我正在用这个做序列匹配。当队列大约为 500 时,通常会挂起 3 个进程。
显然,队列中的项目超过 6570 个可能会导致死锁(更多信息请参见 thread)。你可以做的是在主执行结束时为空result_queue
:
while not result_queue.empty():
result_queue.get(False)
result_queue.task_done()
print "Done"
注意在worker函数中不用调用exit
,return
就够了:
except Queue.Empty:
print "done"
return
您也可以考虑使用 Pool:
from multiprocessing import Pool
def task(arg):
"""Called by the workers"""
return arg
def callback(arg):
"""Called by the main process"""
pass
if __name__ == "__main__":
pool = Pool()
njobs = 10000
print "Enqueuing tasks"
for i in xrange(njobs):
pool.apply_async(task, (i,), callback=callback)
print "Closing the pool"
pool.close()
print "Joining the pool"
pool.join()
print "Done"
这是 Issue 8426: multiprocessing.Queue fails to get() very large objects 中详细描述的管道或套接字的实现限制。注意它也适用于很多小物体。
解决方案
两者都
- 确保足够快地同时使用结果队列
- 从子进程调用
Queue.cancel_join_thread()
文档
Bear in mind that a process that has put items in a queue will wait
before terminating until all the buffered items are fed by the
“feeder” thread to the underlying pipe. (The child process can call
the cancel_join_thread() method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that
all items which have been put on the queue will eventually be removed
before the process is joined. Otherwise you cannot be sure that
processes which have put items on the queue will terminate. Remember
also that non-daemonic processes will be joined automatically.
我是 运行 Python 2.7.2 (default, Jun 12 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] on win32
。
我生成 4 个进程,给它们 2 个队列 - 用于任务和结果,最后加入任务队列。当任务计数达到一定数量时 - 例如 njobs = 10000
- 一些子进程和主进程不会退出,即使所有任务都已完成。
这是为什么?
说明这一点的代码
def worker(job_queue, result_queue):
import Queue
while True:
try:
j = job_queue.get(False)
except Queue.Empty:
exit('done')
else:
result_queue.put_nowait(j)
job_queue.task_done()
if __name__ == "__main__":
from multiprocessing import JoinableQueue, Process, cpu_count
job_queue = JoinableQueue()
result_queue = JoinableQueue()
njobs = 10000
for i in xrange(njobs):
job_queue.put(i)
cpus = cpu_count()
for i in xrange(cpus):
p = Process(target=worker, args=(job_queue, result_queue))
p.start()
job_queue.join()
print("DONE")
并且任务越长,某个(或所有)进程挂起所需的任务数就越少。最初,我正在用这个做序列匹配。当队列大约为 500 时,通常会挂起 3 个进程。
显然,队列中的项目超过 6570 个可能会导致死锁(更多信息请参见 thread)。你可以做的是在主执行结束时为空result_queue
:
while not result_queue.empty():
result_queue.get(False)
result_queue.task_done()
print "Done"
注意在worker函数中不用调用exit
,return
就够了:
except Queue.Empty:
print "done"
return
您也可以考虑使用 Pool:
from multiprocessing import Pool
def task(arg):
"""Called by the workers"""
return arg
def callback(arg):
"""Called by the main process"""
pass
if __name__ == "__main__":
pool = Pool()
njobs = 10000
print "Enqueuing tasks"
for i in xrange(njobs):
pool.apply_async(task, (i,), callback=callback)
print "Closing the pool"
pool.close()
print "Joining the pool"
pool.join()
print "Done"
这是 Issue 8426: multiprocessing.Queue fails to get() very large objects 中详细描述的管道或套接字的实现限制。注意它也适用于很多小物体。
解决方案
两者都
- 确保足够快地同时使用结果队列
- 从子进程调用
Queue.cancel_join_thread()
文档
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.