两个 python 个池,两个队列

Two python pools, two queues

我试图了解池和队列在 Python 中的工作方式,但以下示例未按预期工作。我希望程序结束,但它陷入了无限循环,因为第二个队列没有被清空。

import multiprocessing
import os
import time

inq = multiprocessing.Queue()
outq = multiprocessing.Queue()

def worker_main(q1, q2):
    while True:
        i = q1.get(True)
        time.sleep(.1) 
        q2.put(i*2)

def worker2(q):
    print q.get(True)

p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
p2 = multiprocessing.Pool(2, worker2,(outq,))


for i in range(50):
    inq.put(i)


while inq.qsize()>0 or outq.qsize()>0:
    print 'q1 size', inq.qsize(), 'q2 size', outq.qsize()
    time.sleep(.1)

输出显示第二个队列(outq)是.get一次,仅此而已。

输出:

q1 size 49 q2 size 0
q1 size 47 q2 size 0
2
4
q1 size 44 q2 size 1
q1 size 41 q2 size 4
q1 size 38 q2 size 7
q1 size 35 q2 size 11
q1 size 31 q2 size 14
q1 size 27 q2 size 18
q1 size 24 q2 size 21
q1 size 22 q2 size 23
q1 size 19 q2 size 26
q1 size 15 q2 size 30
q1 size 12 q2 size

为什么 worker2 直到 outq 为空才被调用?

这是一种使用 Pool 的非常奇怪的方式。传递给构造函数的函数在池中的每个进程中只被调用一次。它用于一次性初始化任务,很少使用。

照原样,您的 worker2 被调用了两次,您的 p2 池中的每个进程一次。您的函数从队列中获取一个值,然后退出。该过程从不做任何其他事情。所以它完全按照您编写的代码执行。

根本没有明显的理由在这里使用 Pool;创建 5 multiprocessing.Process 个对象会更自然。

如果你觉得必须这样,那么你需要在worker2中放一个循环。这是一种方法:

import multiprocessing
import time

def worker_main(q1, q2):
    while True:
        i = q1.get()
        if i is None:
            break
        time.sleep(.1) 
        q2.put(i*2)

def worker2(q):
    while True:
        print(q.get())

if __name__ == "__main__":
    inq = multiprocessing.Queue()
    outq = multiprocessing.Queue()
    p1 = multiprocessing.Pool(3, worker_main,(inq, outq,))
    p2 = multiprocessing.Pool(2, worker2,(outq,))

    for i in range(50):
        inq.put(i)
    for i in range(3): # tell worker_main we're done
        inq.put(None)

    while inq.qsize()>0 or outq.qsize()>0:
        print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
        time.sleep(.1)

建议

这是一种使用 Process 对象的 "more natural" 方式,并使用队列哨兵(特殊值 - 这里 None)让进程知道何时停止。顺便说一句,我使用的是 Python 3,因此请将 print 用作函数而不是语句。

import multiprocessing as mp
import time

def worker_main(q1, q2):
    while True:
        i = q1.get()
        if i is None:
            break
        time.sleep(.1) 
        q2.put(i*2)

def worker2(q):
    while True:
        i = q.get()
        if i is None:
            break
        print(i)

def wait(procs):
    alive_count = len(procs)
    while alive_count:
        alive_count = 0
        for p in procs:
            if p.is_alive():
                p.join(timeout=0.1)
                print('q1 size', inq.qsize(), 'q2 size', outq.qsize())
                alive_count += 1

if __name__ == "__main__":
    inq = mp.Queue()
    outq = mp.Queue()
    p1s = [mp.Process(target=worker_main, args=(inq, outq,))
           for i in range(3)]
    p2s = [mp.Process(target=worker2, args=(outq,))
           for i in range(2)]
    for p in p1s + p2s:
        p.start()

    for i in range(50):
        inq.put(i)
    for p in p1s: # tell worker_main we're done
        inq.put(None)

    wait(p1s)
    # Tell worker2 we're done
    for p in p2s:
        outq.put(None)
    wait(p2s)