Process.join() 和队列不适用于大数字

Process.join() and queue don't work with large numbers

我正在尝试拆分 for 循环,即

N = 1000000
for i in xrange(N):
    #do something

使用 multiprocessing.Process 并且它适用于较小的 N 值。 当我使用更大的 N 值时出现问题。在 p.join() 之前或期间发生了一些奇怪的事情并且程序没有响应。如果我在函数 f 的定义中输入 print i,而不是 q.put(i),一切正常。

如有任何帮助,我将不胜感激。这是代码。

from multiprocessing import Process, Queue

def f(q,nMin, nMax): # function for multiprocessing
    for i in xrange(nMin,nMax):
        q.put(i)

if __name__ == '__main__':

    nEntries = 1000000

    nCpu = 10
    nEventsPerCpu = nEntries/nCpu
    processes = []

    q = Queue()

    for i in xrange(nCpu):
        processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print q.qsize()

您正在尝试无限制地增加您的队列,并且您正在加入一个正在等待队列中的空间的子进程,因此您的主进程停滞不前等待它完成,而且它永远不会。

如果您在加入之前将数据从队列中拉出,它将正常工作。

您可以使用的一种技术如下所示:

while 1:
    running = any(p.is_alive() for p in processes)
    while not queue.empty():
       process_queue_data()
    if not running:
        break

根据文档,p.is_alive() 应该执行隐式连接,但它似乎也暗示最佳做法可能是在此之后对所有线程显式执行连接。

编辑:虽然这很清楚,但可能不是那么高效。如何让它表现得更好将是高度特定于任务和机器的(一般来说,无论如何,你不应该一次创建那么多进程,除非某些进程会在 I/O 上被阻止)。

除了将进程数量减少到 CPU 数量之外,一些简单的修复可以使其更快(同样,取决于具体情况)可能如下所示:

liveprocs = list(processes)
while liveprocs:
    try:
        while 1:
            process_queue_data(q.get(False))
    except Queue.Empty:
        pass

    time.sleep(0.5)    # Give tasks a chance to put more data in
    if not q.empty():
        continue
    liveprocs = [p for p in liveprocs if p.is_alive()]