Python 多进程从队列中获取结果
Python multiprocess get result from queue
我是 运行 一个多处理脚本,它应该在大约 0.01 秒内启动 2.000.000 个作业。每个作业将结果放入从 Queue 导入的队列中,因为来自 Multiprocessing 模块的队列无法处理其中超过 517 个结果。
我的程序在从队列中获取结果之前冻结。这是我的多进程函数的核心:
while argslist != []:
p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
jobs.append(p)
p.start()
for p in jobs:
p.join()
print 'over'
res = [result_queue.get() for p in jobs]
print 'got it'
输出:"over" 但永远不会 "got it"
当我替换
result_queue.get()
来自
result_queue.get_nowait()
我收到 raise Empty 错误,说我的队列是空的...
但是如果我在我的内部函数中的 queue.put() 之后执行 queue.get() ,那么它就可以工作,向我表明我的函数很好地归档了队列..
queue.Queue
不在进程之间共享,因此它无法使用,您必须使用 multiprocessing.Queue
.
为避免死锁,您不应在从队列中获取结果之前加入进程。 multiprocessing.Queue
受到其底层管道缓冲区的有效限制,因此如果填满,则无法将更多项目刷新到管道并且 queue.put()
将阻塞,直到消费者调用 queue.get()
,但如果消费者正在加入一个阻塞的进程,那么你就有了死锁。
您可以通过使用 multiprocessing.Pool
及其 map()
来避免所有这些。
谢谢 mata,我切换回了 multiprocessing.Queue(),但我不想使用池,因为我想跟踪有多少作业完成了 运行。我终于添加了一个 if 语句来定期清空我的队列。
def multiprocess(function, argslist, ncpu):
total = len(argslist)
done = 0
result_queue = mp.Queue(0)
jobs = []
res = []
while argslist != []:
if len(mp.active_children()) < ncpu:
p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
jobs.append(p)
p.start()
done += 1
print "\r",float(done)/total*100,"%", #here is to keep track
# here comes my emptying step
if len(jobs) == 500:
tmp = [result_queue.get() for p in jobs]
for r in tmp:
res.append(r)
result_queue = mp.Queue(0)
jobs = []
tmp = [result_queue.get() for p in jobs]
for r in tmp:
res.append(r)
return res
然后我想到了这个问题:
500 个作业的限制是因为 python 还是因为我的机器或系统?
如果在其他条件下使用我的多处理功能,这个阈值会不会有问题?
我是 运行 一个多处理脚本,它应该在大约 0.01 秒内启动 2.000.000 个作业。每个作业将结果放入从 Queue 导入的队列中,因为来自 Multiprocessing 模块的队列无法处理其中超过 517 个结果。
我的程序在从队列中获取结果之前冻结。这是我的多进程函数的核心:
while argslist != []:
p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
jobs.append(p)
p.start()
for p in jobs:
p.join()
print 'over'
res = [result_queue.get() for p in jobs]
print 'got it'
输出:"over" 但永远不会 "got it"
当我替换
result_queue.get()
来自
result_queue.get_nowait()
我收到 raise Empty 错误,说我的队列是空的...
但是如果我在我的内部函数中的 queue.put() 之后执行 queue.get() ,那么它就可以工作,向我表明我的函数很好地归档了队列..
queue.Queue
不在进程之间共享,因此它无法使用,您必须使用 multiprocessing.Queue
.
为避免死锁,您不应在从队列中获取结果之前加入进程。 multiprocessing.Queue
受到其底层管道缓冲区的有效限制,因此如果填满,则无法将更多项目刷新到管道并且 queue.put()
将阻塞,直到消费者调用 queue.get()
,但如果消费者正在加入一个阻塞的进程,那么你就有了死锁。
您可以通过使用 multiprocessing.Pool
及其 map()
来避免所有这些。
谢谢 mata,我切换回了 multiprocessing.Queue(),但我不想使用池,因为我想跟踪有多少作业完成了 运行。我终于添加了一个 if 语句来定期清空我的队列。
def multiprocess(function, argslist, ncpu):
total = len(argslist)
done = 0
result_queue = mp.Queue(0)
jobs = []
res = []
while argslist != []:
if len(mp.active_children()) < ncpu:
p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
jobs.append(p)
p.start()
done += 1
print "\r",float(done)/total*100,"%", #here is to keep track
# here comes my emptying step
if len(jobs) == 500:
tmp = [result_queue.get() for p in jobs]
for r in tmp:
res.append(r)
result_queue = mp.Queue(0)
jobs = []
tmp = [result_queue.get() for p in jobs]
for r in tmp:
res.append(r)
return res
然后我想到了这个问题:
500 个作业的限制是因为 python 还是因为我的机器或系统?
如果在其他条件下使用我的多处理功能,这个阈值会不会有问题?