如何用多个队列实现多进程IPC?

How to implement multiprocessing IPC with multiple queues?

我是多处理和多线程的新手。出于学习目的,我正在尝试使用队列实现 IPC。

Code


from multiprocessing import Process, Queue, Lock

import math



def calculate_square(sq_q, sqrt_q):
    itm = sq_q.get()
    print(f"Calculating sq of: {itm}")
    square = itm * itm
    sqrt_q.put(square)

def calculate_sqroot(sqrt_q, result_q):
    itm = sqrt_q.get()
    print(f"Calculating sqrt of: {itm}")
    sqrt = math.sqrt(itm)
    result_q.put(sqrt)



sq_q = Queue()
sqrt_q = Queue()
result_q = Queue()


for i in range(5, 20):
    sq_q.put(i)


p_sq = Process(target=calculate_square, args=(sq_q, sqrt_q))
p_sqrt = Process(target=calculate_sqroot, args=(sqrt_q, result_q))


p_sq.start()
p_sqrt.start()



p_sq.join()
p_sqrt.join()

while not result_q.empty():
    print(result_q.get())

说明

Here i'm trying to run two of the functions with two different processes each of which calculates the square of the number and again calculate the square root of the number.

队列

问题

Only the first item of the sq_q is consumed.

输出:

5.0

I expect the output to be: [5, 6, 7, 8, .. , 19]

请注意,这纯粹是为了学习目的,我想用多个队列实现 IPC,尽管它可以通过共享对象锁和数组来实现。

你只调用了一次函数,所以只取第一个值 5,然后你想循环队列中的所有值。

while not sq_q.empty():
    itm = sq_q.get()
    print(f"Calculating sq of: {itm}")
    square = itm * itm
    sqrt_q.put(square)

其他功能也是如此,但这里的条件将一直持续到 result_q 已满(为结果队列提供最大大小以使条件起作用),然后最终结果将具有值.

    while not result_q.full():
        itm = sqrt_q.get()
        print(f"Calculating sqrt of: {itm}")
        sqrt = math.sqrt(itm)
        result_q.put(sqrt)

完整代码

import math
from multiprocessing import Process, Queue


def calculate_square(sq_q, sqrt_q):
    while not sq_q.empty():
        itm = sq_q.get()
        print(f"Calculating sq of: {itm}")
        square = itm * itm
        sqrt_q.put(square)


def calculate_sqroot(sqrt_q, result_q):
    while not result_q.full():
        itm = sqrt_q.get()
        print(f"Calculating sqrt of: {itm}")
        sqrt = math.sqrt(itm)
        result_q.put(sqrt)


if __name__ == "__main__":
    sq_q = Queue()
    sqrt_q = Queue()
    result_q = Queue(5)

    for i in range(5):
        sq_q.put(i)

    p_sq = Process(target=calculate_square, args=(sq_q, sqrt_q))
    p_sqrt = Process(target=calculate_sqroot, args=(sqrt_q, result_q))

    p_sq.start()
    p_sqrt.start()

    p_sq.join()
    p_sqrt.join()

    while not result_q.empty():
        print(result_q.get())

输出

Calculating sq of: 0
Calculating sq of: 1
Calculating sq of: 2
Calculating sq of: 3
Calculating sq of: 4
Calculating sqrt of: 0
Calculating sqrt of: 1
Calculating sqrt of: 4
Calculating sqrt of: 9
Calculating sqrt of: 16
0.0
1.0
2.0
3.0
4.0

编辑:

由于 calculate_sqroot 现在依赖于 result_q,不再需要延迟。

我们也可以使用JoinableQueue,它提供了两个最方便的方法,这样我们就可以阻塞主进程,直到队列中的项目被完全消耗。

我。 task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

ii.加入()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

解决方案

from multiprocessing import Process, Lock
from multiprocessing import Queue
from multiprocessing import JoinableQueue

import math, time

def calculate_square(sq_q, sqrt_q):
    while True:
        itm = sq_q.get()
        print(f"Calculating sq of: {itm}")
        square = itm * itm
        sqrt_q.put(square)
        sq_q.task_done()

def calculate_sqroot(sqrt_q, result_q):
    while True:
        itm = sqrt_q.get() # this blocks the process unless there's a item to consume
        print(f"Calculating sqrt of: {itm}")
        sqrt = math.sqrt(itm)
        result_q.put(sqrt)
        sqrt_q.task_done()
        
items = [i for i in range(5, 20)]

sq_q = JoinableQueue()
sqrt_q = JoinableQueue()
result_q = JoinableQueue()

for i in items:
    sq_q.put(i)
    
p_sq = Process(target=calculate_square, args=(sq_q, sqrt_q))
p_sqrt = Process(target=calculate_sqroot, args=(sqrt_q, result_q))

p_sq.start()
p_sqrt.start()

sq_q.join()
sqrt_q.join()
# result_q.join() no need to join this queue

while not result_q.empty():
    print(result_q.get())