使用多处理队列从进程中获取信息
Getting information back from a process with a multiprocessing Queue
我正在尝试使用多处理,我想在 Python 的主线程和带有队列的子进程之间进行通信。这是我编写的一个快速测试代码,应该定期获取子进程生成的一些结果:
from multiprocessing import Process, Queue
import time
def calculate(queue):
n = 0
while n < 10:
n += 1
queue.put(n)
time.sleep(1)
queue.put(0)
def queue_getter(queue):
executing = True
while executing:
while queue.qsize():
n = queue.get()
print(n)
if n == 0:
executing = False
time.sleep(0.1)
print('done')
queue = Queue()
p = Process(target=calculate, args=(queue,))
p.start()
queue_getter(queue)
p.join()
print('DONE')
这个程序永远挂起,而用 threading.Thread
替换 Process
给出了预期的结果:
1
2
3
4
5
6
7
8
9
10
0
done
DONE
在这种情况下,如何使 Process 的行为与 Thread 相同?
您的程序在 POSIX (UNIX-like) 系统上运行良好。
但是,要使其在 ms-windows 和 macOS 上正常运行,您需要将程序本身放在一个主块中,以便可以无副作用地导入文件。
这是由于 multiprocessing
必须在 ms-windows 和 macOS 上工作的方式。阅读 programming guidelines 进行多处理。
像这样修改您的代码:
from multiprocessing import Process, Queue
import time
def calculate(queue):
n = 0
while n < 10:
n += 1
queue.put(n)
time.sleep(1)
queue.put(0)
def queue_getter(queue):
executing = True
while executing:
while queue.qsize():
n = queue.get()
print(n)
if n == 0:
executing = False
time.sleep(0.1)
print("done")
if __name__ == "__main__":
queue = Queue()
p = Process(target=calculate, args=(queue,))
p.start()
queue_getter(queue)
p.join()
print("DONE")
这是一种简化且更可靠的方法,除了不打印零外,它(几乎)在功能上与 OP 的原始方法相同:
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor
import time
def calculate(q):
for n in range(1, 11):
q.put(n)
time.sleep(1)
q.put(0)
def queue_getter(q):
while (n := q.get()):
print(n)
def main():
with Manager() as manager:
q = manager.Queue()
with ProcessPoolExecutor() as executor:
executor.submit(calculate, q)
queue_getter(q)
if __name__ == '__main__':
main()
我正在尝试使用多处理,我想在 Python 的主线程和带有队列的子进程之间进行通信。这是我编写的一个快速测试代码,应该定期获取子进程生成的一些结果:
from multiprocessing import Process, Queue
import time
def calculate(queue):
n = 0
while n < 10:
n += 1
queue.put(n)
time.sleep(1)
queue.put(0)
def queue_getter(queue):
executing = True
while executing:
while queue.qsize():
n = queue.get()
print(n)
if n == 0:
executing = False
time.sleep(0.1)
print('done')
queue = Queue()
p = Process(target=calculate, args=(queue,))
p.start()
queue_getter(queue)
p.join()
print('DONE')
这个程序永远挂起,而用 threading.Thread
替换 Process
给出了预期的结果:
1
2
3
4
5
6
7
8
9
10
0
done
DONE
在这种情况下,如何使 Process 的行为与 Thread 相同?
您的程序在 POSIX (UNIX-like) 系统上运行良好。
但是,要使其在 ms-windows 和 macOS 上正常运行,您需要将程序本身放在一个主块中,以便可以无副作用地导入文件。
这是由于 multiprocessing
必须在 ms-windows 和 macOS 上工作的方式。阅读 programming guidelines 进行多处理。
像这样修改您的代码:
from multiprocessing import Process, Queue
import time
def calculate(queue):
n = 0
while n < 10:
n += 1
queue.put(n)
time.sleep(1)
queue.put(0)
def queue_getter(queue):
executing = True
while executing:
while queue.qsize():
n = queue.get()
print(n)
if n == 0:
executing = False
time.sleep(0.1)
print("done")
if __name__ == "__main__":
queue = Queue()
p = Process(target=calculate, args=(queue,))
p.start()
queue_getter(queue)
p.join()
print("DONE")
这是一种简化且更可靠的方法,除了不打印零外,它(几乎)在功能上与 OP 的原始方法相同:
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor
import time
def calculate(q):
for n in range(1, 11):
q.put(n)
time.sleep(1)
q.put(0)
def queue_getter(q):
while (n := q.get()):
print(n)
def main():
with Manager() as manager:
q = manager.Queue()
with ProcessPoolExecutor() as executor:
executor.submit(calculate, q)
queue_getter(q)
if __name__ == '__main__':
main()