如何使用输入和输出队列从 Python 的线程 class 收集结果?
How to gather results from Python's Thread class with an input and output queue?
我想在 python 中学习一些关于线程的知识。我知道我可以使用其他各种池和进程或管道,它们可能更容易使用,但我对使用线程模块很感兴趣。
from threading import Thread
from queue import Queue
class SimulationThread(Thread):
def __init__(self, input_queue: Queue, results_queue: Queue):
Thread.__init__(self)
self.input_queue = input_queue
self.results_queue = results_queue
def run(self) -> None:
try:
data = self.input_queue.get() # will be replaced with simulation data
self.results_queue.put(data)
finally:
return self.input_queue.task_done()
N = 10 # number of simulations to run
NP = 8 # number of threads to use
input_queue = Queue()
results_queue = Queue()
for x in range(NP):
worker = SimulationThread(input_queue, results_queue)
worker.daemon = True
worker.start()
for i in range(N):
input_queue.put(i)
现在我尝试了几种不同的方法来收集结果:
# always returns 0
print(results_queue.get())
#hangs
results_queue.join()
# does nothing, I'm quessing queue is not yet populated
while not results_queue.empty():
print(results_queue.get())
# prints nothing
ret = results_queue.get()
while ret is None:
ret = results_queue.get()
print(ret)
# finally prints out the results, but in order of 1 - 7. No 8 or 9.
ret = results_queue.get()
while ret is not None:
ret = results_queue.get()
print(ret)
if results_queue.empty():
ret = None
While 是我停下来寻求帮助的地方。如何让所有NP线程同时处理所有N个数?
您可能想使用 JoinableQueue
。每个任务在完成其工作时,都会从队列中调用 .task_done()
从输入中获取。
您的主线程然后在同一个队列上调用 queue.join()
。这不会 return 直到对 task_done()
的调用与添加到队列中的项目一样多。
默认情况下 Queue.get 将在必要时阻止,直到有可用的项目为止。您将 10 个项目添加到 input_queue
队列,但随后您只创建了 8 个线程。
您的线程应持续收集和处理队列中的项目,直到它们停止。你可以尝试这样的事情:
test.py:
from queue import Queue
from threading import Thread
N = 10 # number of simulations to run
NP = 8 # number of threads to use
class SimulationThread(Thread):
def __init__(self, input_queue, results_queue):
super().__init__()
self.input_queue = input_queue
self.results_queue = results_queue
def run(self):
for data in iter(self.input_queue.get, "STOP"):
self.results_queue.put(data * 2)
def main():
input_queue = Queue()
results_queue = Queue()
for i in range(N):
input_queue.put(i)
for _ in range(NP):
SimulationThread(input_queue, results_queue).start()
for i in range(N):
print(i, results_queue.get())
for _ in range(NP):
input_queue.put("STOP")
if __name__ == "__main__":
main()
测试:
$ python test.py
0 0
1 2
2 4
3 6
4 8
5 10
6 12
7 14
8 16
9 18
我想在 python 中学习一些关于线程的知识。我知道我可以使用其他各种池和进程或管道,它们可能更容易使用,但我对使用线程模块很感兴趣。
from threading import Thread
from queue import Queue
class SimulationThread(Thread):
def __init__(self, input_queue: Queue, results_queue: Queue):
Thread.__init__(self)
self.input_queue = input_queue
self.results_queue = results_queue
def run(self) -> None:
try:
data = self.input_queue.get() # will be replaced with simulation data
self.results_queue.put(data)
finally:
return self.input_queue.task_done()
N = 10 # number of simulations to run
NP = 8 # number of threads to use
input_queue = Queue()
results_queue = Queue()
for x in range(NP):
worker = SimulationThread(input_queue, results_queue)
worker.daemon = True
worker.start()
for i in range(N):
input_queue.put(i)
现在我尝试了几种不同的方法来收集结果:
# always returns 0
print(results_queue.get())
#hangs
results_queue.join()
# does nothing, I'm quessing queue is not yet populated
while not results_queue.empty():
print(results_queue.get())
# prints nothing
ret = results_queue.get()
while ret is None:
ret = results_queue.get()
print(ret)
# finally prints out the results, but in order of 1 - 7. No 8 or 9.
ret = results_queue.get()
while ret is not None:
ret = results_queue.get()
print(ret)
if results_queue.empty():
ret = None
While 是我停下来寻求帮助的地方。如何让所有NP线程同时处理所有N个数?
您可能想使用 JoinableQueue
。每个任务在完成其工作时,都会从队列中调用 .task_done()
从输入中获取。
您的主线程然后在同一个队列上调用 queue.join()
。这不会 return 直到对 task_done()
的调用与添加到队列中的项目一样多。
默认情况下 Queue.get 将在必要时阻止,直到有可用的项目为止。您将 10 个项目添加到 input_queue
队列,但随后您只创建了 8 个线程。
您的线程应持续收集和处理队列中的项目,直到它们停止。你可以尝试这样的事情:
test.py:
from queue import Queue
from threading import Thread
N = 10 # number of simulations to run
NP = 8 # number of threads to use
class SimulationThread(Thread):
def __init__(self, input_queue, results_queue):
super().__init__()
self.input_queue = input_queue
self.results_queue = results_queue
def run(self):
for data in iter(self.input_queue.get, "STOP"):
self.results_queue.put(data * 2)
def main():
input_queue = Queue()
results_queue = Queue()
for i in range(N):
input_queue.put(i)
for _ in range(NP):
SimulationThread(input_queue, results_queue).start()
for i in range(N):
print(i, results_queue.get())
for _ in range(NP):
input_queue.put("STOP")
if __name__ == "__main__":
main()
测试:
$ python test.py
0 0
1 2
2 4
3 6
4 8
5 10
6 12
7 14
8 16
9 18