如何使用输入和输出队列从 Python 的线程 class 收集结果?

How to gather results from Python's Thread class with an input and output queue?

我想在 python 中学习一些关于线程的知识。我知道我可以使用其他各种池和进程或管道,它们可能更容易使用,但我对使用线程模块很感兴趣。

from threading import Thread
from queue import Queue

class SimulationThread(Thread):

    def __init__(self, input_queue: Queue, results_queue: Queue):
        Thread.__init__(self)
        self.input_queue = input_queue
        self.results_queue = results_queue

    def run(self) -> None:
        try:
            data = self.input_queue.get() # will be replaced with simulation data
            self.results_queue.put(data)
        finally:
            return self.input_queue.task_done()

N = 10 # number of simulations to run
NP = 8 # number of threads to use
input_queue = Queue()
results_queue = Queue()

for x in range(NP):
    worker = SimulationThread(input_queue, results_queue)
    worker.daemon = True
    worker.start()

for i in range(N):
    input_queue.put(i)

现在我尝试了几种不同的方法来收集结果:

# always returns 0
print(results_queue.get())
#hangs
results_queue.join()
# does nothing, I'm quessing queue is not yet populated
while not results_queue.empty():
    print(results_queue.get())
# prints nothing
ret = results_queue.get()
while ret is None:
    ret = results_queue.get()
    print(ret)
# finally prints out the results, but in order of 1 - 7. No 8 or 9.
ret = results_queue.get()
while ret is not None:
    ret = results_queue.get()
    print(ret)
    if results_queue.empty():
        ret = None

While 是我停下来寻求帮助的地方。如何让所有NP线程同时处理所有N个数?

您可能想使用 JoinableQueue。每个任务在完成其工作时,都会从队列中调用 .task_done() 从输入中获取。

您的主线程然后在同一个队列上调用 queue.join()。这不会 return 直到对 task_done() 的调用与添加到队列中的项目一样多。

默认情况下 Queue.get 将在必要时阻止,直到有可用的项目为止。您将 10 个项目添加到 input_queue 队列,但随后您只创建了 8 个线程。

您的线程应持续收集和处理队列中的项目,直到它们停止。你可以尝试这样的事情:

test.py:

from queue import Queue
from threading import Thread

N = 10  # number of simulations to run
NP = 8  # number of threads to use


class SimulationThread(Thread):
    def __init__(self, input_queue, results_queue):
        super().__init__()

        self.input_queue = input_queue
        self.results_queue = results_queue

    def run(self):
        for data in iter(self.input_queue.get, "STOP"):
            self.results_queue.put(data * 2)


def main():
    input_queue = Queue()
    results_queue = Queue()

    for i in range(N):
        input_queue.put(i)

    for _ in range(NP):
        SimulationThread(input_queue, results_queue).start()

    for i in range(N):
        print(i, results_queue.get())

    for _ in range(NP):
        input_queue.put("STOP")


if __name__ == "__main__":
    main()

测试:

$ python test.py
0 0
1 2
2 4
3 6
4 8
5 10
6 12
7 14
8 16
9 18