Process.join() 和队列不适用于大数字
Process.join() and queue don't work with large numbers
我正在尝试拆分 for 循环,即
N = 1000000
for i in xrange(N):
#do something
使用 multiprocessing.Process 并且它适用于较小的 N 值。
当我使用更大的 N 值时出现问题。在 p.join() 之前或期间发生了一些奇怪的事情并且程序没有响应。如果我在函数 f 的定义中输入 print i,而不是 q.put(i),一切正常。
如有任何帮助,我将不胜感激。这是代码。
from multiprocessing import Process, Queue
def f(q,nMin, nMax): # function for multiprocessing
for i in xrange(nMin,nMax):
q.put(i)
if __name__ == '__main__':
nEntries = 1000000
nCpu = 10
nEventsPerCpu = nEntries/nCpu
processes = []
q = Queue()
for i in xrange(nCpu):
processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )
for p in processes:
p.start()
for p in processes:
p.join()
print q.qsize()
您正在尝试无限制地增加您的队列,并且您正在加入一个正在等待队列中的空间的子进程,因此您的主进程停滞不前等待它完成,而且它永远不会。
如果您在加入之前将数据从队列中拉出,它将正常工作。
您可以使用的一种技术如下所示:
while 1:
running = any(p.is_alive() for p in processes)
while not queue.empty():
process_queue_data()
if not running:
break
根据文档,p.is_alive() 应该执行隐式连接,但它似乎也暗示最佳做法可能是在此之后对所有线程显式执行连接。
编辑:虽然这很清楚,但可能不是那么高效。如何让它表现得更好将是高度特定于任务和机器的(一般来说,无论如何,你不应该一次创建那么多进程,除非某些进程会在 I/O 上被阻止)。
除了将进程数量减少到 CPU 数量之外,一些简单的修复可以使其更快(同样,取决于具体情况)可能如下所示:
liveprocs = list(processes)
while liveprocs:
try:
while 1:
process_queue_data(q.get(False))
except Queue.Empty:
pass
time.sleep(0.5) # Give tasks a chance to put more data in
if not q.empty():
continue
liveprocs = [p for p in liveprocs if p.is_alive()]
我正在尝试拆分 for 循环,即
N = 1000000
for i in xrange(N):
#do something
使用 multiprocessing.Process 并且它适用于较小的 N 值。 当我使用更大的 N 值时出现问题。在 p.join() 之前或期间发生了一些奇怪的事情并且程序没有响应。如果我在函数 f 的定义中输入 print i,而不是 q.put(i),一切正常。
如有任何帮助,我将不胜感激。这是代码。
from multiprocessing import Process, Queue
def f(q,nMin, nMax): # function for multiprocessing
for i in xrange(nMin,nMax):
q.put(i)
if __name__ == '__main__':
nEntries = 1000000
nCpu = 10
nEventsPerCpu = nEntries/nCpu
processes = []
q = Queue()
for i in xrange(nCpu):
processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )
for p in processes:
p.start()
for p in processes:
p.join()
print q.qsize()
您正在尝试无限制地增加您的队列,并且您正在加入一个正在等待队列中的空间的子进程,因此您的主进程停滞不前等待它完成,而且它永远不会。
如果您在加入之前将数据从队列中拉出,它将正常工作。
您可以使用的一种技术如下所示:
while 1:
running = any(p.is_alive() for p in processes)
while not queue.empty():
process_queue_data()
if not running:
break
根据文档,p.is_alive() 应该执行隐式连接,但它似乎也暗示最佳做法可能是在此之后对所有线程显式执行连接。
编辑:虽然这很清楚,但可能不是那么高效。如何让它表现得更好将是高度特定于任务和机器的(一般来说,无论如何,你不应该一次创建那么多进程,除非某些进程会在 I/O 上被阻止)。
除了将进程数量减少到 CPU 数量之外,一些简单的修复可以使其更快(同样,取决于具体情况)可能如下所示:
liveprocs = list(processes)
while liveprocs:
try:
while 1:
process_queue_data(q.get(False))
except Queue.Empty:
pass
time.sleep(0.5) # Give tasks a chance to put more data in
if not q.empty():
continue
liveprocs = [p for p in liveprocs if p.is_alive()]