两个 python 个池,两个队列
Two python pools, two queues
我试图了解池和队列在 Python 中的工作方式,但以下示例未按预期工作。我希望程序结束,但它陷入了无限循环,因为第二个队列没有被清空。
import multiprocessing
import os
import time
inq = multiprocessing.Queue()
outq = multiprocessing.Queue()
def worker_main(q1, q2):
while True:
i = q1.get(True)
time.sleep(.1)
q2.put(i*2)
def worker2(q):
print q.get(True)
p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
p2 = multiprocessing.Pool(2, worker2,(outq,))
for i in range(50):
inq.put(i)
while inq.qsize()>0 or outq.qsize()>0:
print 'q1 size', inq.qsize(), 'q2 size', outq.qsize()
time.sleep(.1)
输出显示第二个队列(outq)是.get一次,仅此而已。
输出:
q1 size 49 q2 size 0
q1 size 47 q2 size 0
2
4
q1 size 44 q2 size 1
q1 size 41 q2 size 4
q1 size 38 q2 size 7
q1 size 35 q2 size 11
q1 size 31 q2 size 14
q1 size 27 q2 size 18
q1 size 24 q2 size 21
q1 size 22 q2 size 23
q1 size 19 q2 size 26
q1 size 15 q2 size 30
q1 size 12 q2 size
为什么 worker2 直到 outq 为空才被调用?
这是一种使用 Pool
的非常奇怪的方式。传递给构造函数的函数在池中的每个进程中只被调用一次。它用于一次性初始化任务,很少使用。
照原样,您的 worker2
被调用了两次,您的 p2
池中的每个进程一次。您的函数从队列中获取一个值,然后退出。该过程从不做任何其他事情。所以它完全按照您编写的代码执行。
根本没有明显的理由在这里使用 Pool
;创建 5 multiprocessing.Process
个对象会更自然。
如果你觉得必须这样,那么你需要在worker2
中放一个循环。这是一种方法:
import multiprocessing
import time
def worker_main(q1, q2):
while True:
i = q1.get()
if i is None:
break
time.sleep(.1)
q2.put(i*2)
def worker2(q):
while True:
print(q.get())
if __name__ == "__main__":
inq = multiprocessing.Queue()
outq = multiprocessing.Queue()
p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
p2 = multiprocessing.Pool(2, worker2,(outq,))
for i in range(50):
inq.put(i)
for i in range(3): # tell worker_main we're done
inq.put(None)
while inq.qsize()>0 or outq.qsize()>0:
print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
time.sleep(.1)
建议
这是一种使用 Process
对象的 "more natural" 方式,并使用队列哨兵(特殊值 - 这里 None
)让进程知道何时停止。顺便说一句,我使用的是 Python 3,因此请将 print
用作函数而不是语句。
import multiprocessing as mp
import time
def worker_main(q1, q2):
while True:
i = q1.get()
if i is None:
break
time.sleep(.1)
q2.put(i*2)
def worker2(q):
while True:
i = q.get()
if i is None:
break
print(i)
def wait(procs):
alive_count = len(procs)
while alive_count:
alive_count = 0
for p in procs:
if p.is_alive():
p.join(timeout=0.1)
print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
alive_count += 1
if __name__ == "__main__":
inq = mp.Queue()
outq = mp.Queue()
p1s = [mp.Process(target=worker_main, args=(inq, outq,))
for i in range(3)]
p2s = [mp.Process(target=worker2, args=(outq,))
for i in range(2)]
for p in p1s + p2s:
p.start()
for i in range(50):
inq.put(i)
for p in p1s: # tell worker_main we're done
inq.put(None)
wait(p1s)
# Tell worker2 we're done
for p in p2s:
outq.put(None)
wait(p2s)
我试图了解池和队列在 Python 中的工作方式,但以下示例未按预期工作。我希望程序结束,但它陷入了无限循环,因为第二个队列没有被清空。
import multiprocessing
import os
import time
inq = multiprocessing.Queue()
outq = multiprocessing.Queue()
def worker_main(q1, q2):
while True:
i = q1.get(True)
time.sleep(.1)
q2.put(i*2)
def worker2(q):
print q.get(True)
p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
p2 = multiprocessing.Pool(2, worker2,(outq,))
for i in range(50):
inq.put(i)
while inq.qsize()>0 or outq.qsize()>0:
print 'q1 size', inq.qsize(), 'q2 size', outq.qsize()
time.sleep(.1)
输出显示第二个队列(outq)是.get一次,仅此而已。
输出:
q1 size 49 q2 size 0 q1 size 47 q2 size 0 2 4 q1 size 44 q2 size 1 q1 size 41 q2 size 4 q1 size 38 q2 size 7 q1 size 35 q2 size 11 q1 size 31 q2 size 14 q1 size 27 q2 size 18 q1 size 24 q2 size 21 q1 size 22 q2 size 23 q1 size 19 q2 size 26 q1 size 15 q2 size 30 q1 size 12 q2 size
为什么 worker2 直到 outq 为空才被调用?
这是一种使用 Pool
的非常奇怪的方式。传递给构造函数的函数在池中的每个进程中只被调用一次。它用于一次性初始化任务,很少使用。
照原样,您的 worker2
被调用了两次,您的 p2
池中的每个进程一次。您的函数从队列中获取一个值,然后退出。该过程从不做任何其他事情。所以它完全按照您编写的代码执行。
根本没有明显的理由在这里使用 Pool
;创建 5 multiprocessing.Process
个对象会更自然。
如果你觉得必须这样,那么你需要在worker2
中放一个循环。这是一种方法:
import multiprocessing
import time
def worker_main(q1, q2):
while True:
i = q1.get()
if i is None:
break
time.sleep(.1)
q2.put(i*2)
def worker2(q):
while True:
print(q.get())
if __name__ == "__main__":
inq = multiprocessing.Queue()
outq = multiprocessing.Queue()
p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
p2 = multiprocessing.Pool(2, worker2,(outq,))
for i in range(50):
inq.put(i)
for i in range(3): # tell worker_main we're done
inq.put(None)
while inq.qsize()>0 or outq.qsize()>0:
print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
time.sleep(.1)
建议
这是一种使用 Process
对象的 "more natural" 方式,并使用队列哨兵(特殊值 - 这里 None
)让进程知道何时停止。顺便说一句,我使用的是 Python 3,因此请将 print
用作函数而不是语句。
import multiprocessing as mp
import time
def worker_main(q1, q2):
while True:
i = q1.get()
if i is None:
break
time.sleep(.1)
q2.put(i*2)
def worker2(q):
while True:
i = q.get()
if i is None:
break
print(i)
def wait(procs):
alive_count = len(procs)
while alive_count:
alive_count = 0
for p in procs:
if p.is_alive():
p.join(timeout=0.1)
print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
alive_count += 1
if __name__ == "__main__":
inq = mp.Queue()
outq = mp.Queue()
p1s = [mp.Process(target=worker_main, args=(inq, outq,))
for i in range(3)]
p2s = [mp.Process(target=worker2, args=(outq,))
for i in range(2)]
for p in p1s + p2s:
p.start()
for i in range(50):
inq.put(i)
for p in p1s: # tell worker_main we're done
inq.put(None)
wait(p1s)
# Tell worker2 we're done
for p in p2s:
outq.put(None)
wait(p2s)