当作业数超过 n 时,进程无法退出()

Processes fail to exit() when job count exceeds n

我是 运行 Python 2.7.2 (default, Jun 12 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] on win32

我生成 4 个进程,给它们 2 个队列 - 用于任务和结果,最后加入任务队列。当任务计数达到一定数量时 - 例如 njobs = 10000 - 一些子进程和主进程不会退出,即使所有任务都已完成。

这是为什么?

说明这一点的代码

def worker(job_queue, result_queue):
    import Queue

    while True:
        try:
            j = job_queue.get(False)
        except Queue.Empty:
            exit('done')
        else:
            result_queue.put_nowait(j)
            job_queue.task_done()

if __name__ == "__main__":  
    from multiprocessing import JoinableQueue, Process, cpu_count

    job_queue = JoinableQueue()
    result_queue = JoinableQueue()

    njobs = 10000
    for i in xrange(njobs):
        job_queue.put(i)

    cpus = cpu_count()
    for i in xrange(cpus):
        p = Process(target=worker, args=(job_queue, result_queue))
        p.start()

    job_queue.join()
    print("DONE")

并且任务越长,某个(或所有)进程挂起所需的任务数就越少。最初,我正在用这个做序列匹配。当队列大约为 500 时,通常会挂起 3 个进程。

显然,队列中的项目超过 6570 个可能会导致死锁(更多信息请参见 thread)。你可以做的是在主执行结束时为空result_queue

while not result_queue.empty():
    result_queue.get(False)
    result_queue.task_done()
print "Done"

注意在worker函数中不用调用exitreturn就够了:

except Queue.Empty:
    print "done"
    return

您也可以考虑使用 Pool:

from multiprocessing import Pool

def task(arg):
    """Called by the workers"""
    return arg

def callback(arg):
    """Called by the main process"""
    pass

if __name__ == "__main__":  
    pool = Pool()
    njobs = 10000
    print "Enqueuing tasks"
    for i in xrange(njobs):
        pool.apply_async(task, (i,), callback=callback)
    print "Closing the pool"
    pool.close()
    print "Joining the pool"
    pool.join()
    print "Done"

这是 Issue 8426: multiprocessing.Queue fails to get() very large objects 中详细描述的管道或套接字的实现限制。注意它也适用于很多小物体。

解决方案

两者都

  • 确保足够快地同时使用结果队列
  • 从子进程调用 Queue.cancel_join_thread()

文档

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

Multiprocessing - Programming guidelines