使用多处理队列从进程中获取信息

Getting information back from a process with a multiprocessing Queue

我正在尝试使用多处理,我想在 Python 的主线程和带有队列的子进程之间进行通信。这是我编写的一个快速测试代码,应该定期获取子进程生成的一些结果:

from multiprocessing import Process, Queue
import time

def calculate(queue):  
    n = 0
    while n < 10:   
        n += 1
        queue.put(n)
        time.sleep(1)
    queue.put(0)

def queue_getter(queue):
    executing = True
    while executing:     
        while queue.qsize():
            n = queue.get()
            print(n)
            if n == 0:
                executing = False
        time.sleep(0.1)
    print('done')

queue = Queue()
p = Process(target=calculate, args=(queue,))
p.start()
queue_getter(queue)
p.join()
print('DONE')

这个程序永远挂起,而用 threading.Thread 替换 Process 给出了预期的结果:

1
2
3
4
5
6
7
8
9
10
0
done
DONE

在这种情况下,如何使 Process 的行为与 Thread 相同?

您的程序在 POSIX (UNIX-like) 系统上运行良好。

但是,要使其在 ms-windows 和 macOS 上正常运行,您需要将程序本身放在一个主块中,以便可以无副作用地导入文件。

这是由于 multiprocessing 必须在 ms-windows 和 macOS 上工作的方式。阅读 programming guidelines 进行多处理。

像这样修改您的代码:

from multiprocessing import Process, Queue
import time


def calculate(queue):
    n = 0
    while n < 10:
        n += 1
        queue.put(n)
        time.sleep(1)
    queue.put(0)


def queue_getter(queue):
    executing = True
    while executing:
        while queue.qsize():
            n = queue.get()
            print(n)
            if n == 0:
                executing = False
        time.sleep(0.1)
    print("done")


if __name__ == "__main__":
    queue = Queue()
    p = Process(target=calculate, args=(queue,))
    p.start()
    queue_getter(queue)
    p.join()
    print("DONE")

这是一种简化且更可靠的方法,除了不打印零外,它(几乎)在功能上与 OP 的原始方法相同:

from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor
import time

def calculate(q):
    for n in range(1, 11):
        q.put(n)
        time.sleep(1)
    q.put(0)

def queue_getter(q):
    while (n := q.get()):
        print(n)

def main():
    with Manager() as manager:
        q = manager.Queue()
        with ProcessPoolExecutor() as executor:
            executor.submit(calculate, q)
            queue_getter(q)

if __name__ == '__main__':
    main()