如何在 Python 中从主线程终止生产者-消费者线程?
How to terminate Producer-Consumer threads from main thread in Python?
我有一个 Producer 和一个 Consumer 线程 (threading.Thread
),它们共享一个 queue
输入 Queue
.
制作人run
:
while self.running:
product = produced() ### I/O operations
queue.put(product)
消费者run
:
while self.running or not queue.empty():
product = queue.get()
time.sleep(several_seconds) ###
consume(product)
现在我需要从主线程终止两个线程,要求 queue
在终止之前必须为空(全部消耗)。
目前我正在使用如下代码来终止这两个线程:
主线程stop
:
producer.running = False
producer.join()
consumer.running = False
consumer.join()
不过我估计消费者多了就不安全了
此外,我不确定sleep
是否会向制作方发布时间表,以便制作更多产品。事实上,我发现生产者保留 "starving" 但我不确定这是否是根本原因。
有什么好的处理方法吗?
编辑 2:
a) 您的消费者持续花费如此多时间的原因是因为即使您没有数据,您的循环也会持续运行。
b) 我在底部添加了代码,展示了如何处理这个问题。
如果我没理解错的话,producer/consumer 是一个连续的过程,例如可以延迟关闭,直到您退出当前阻塞 I/O 并处理您从中收到的数据。
在那种情况下,为了以有序的方式关闭您的生产者和消费者,我会添加从主线程到生产者线程的通信以调用关闭。在最一般的情况下,这可能是一个队列,主线程可以使用它来排队 "shutdown" 代码,但在单个生产者将被停止并且永远不会重新启动的简单情况下,它可能只是一个全局关闭标志。
您的生产者应在其开始阻塞 I/O 操作之前(例如,在您完成向消费者队列发送其他数据之后)在其主循环中检查此关闭条件(队列或标志)。如果设置了标志,那么它应该在队列中放置一个特殊的数据结束代码(看起来不像你的正常数据)来告诉消费者正在关闭,然后生产者应该 return(自行终止)。
消费者应该被修改为在从队列中取出数据时检查这个数据结束代码。如果找到数据结束代码,它应该按顺序关闭并 return(自行终止)。
如果有多个消费者,那么生产者可以在关闭之前排队多条数据结束消息——每个消费者一条消息。由于消费者阅读消息后停止消费,最终都会全部关闭。
或者,如果您事先不知道有多少消费者,则消费者有序关闭的一部分可能是重新排队数据结束代码。
这将确保所有消费者最终看到数据结束代码并关闭,当所有完成后,队列中将剩下一项——数据结束代码排队最后一个消费者。
编辑:
表示数据结束代码的正确方法高度依赖于应用程序,但在许多情况下,简单的 None
就可以很好地工作。由于 None
是单例,消费者可以使用非常有效的 if data is None
构造来处理最终情况。
另一种在某些情况下甚至更有效的可能性是在 try /except
外部 您的主要消费者循环中设置一个 try /except
发生了,这是因为您试图以一种始终有效的方式解压缩数据,除非您正在处理数据结束代码。
编辑 2:
将这些概念与您的初始代码相结合,现在制作人这样做:
while self.running:
product = produced() ### I/O operations
queue.put(product)
for x in range(number_of_consumers):
queue.put(None) # Termination code
每个消费者都这样做:
while 1:
product = queue.get()
if product is None:
break
consume(product)
然后主程序就可以这样做:
producer.running = False
producer.join()
for consumer in consumers:
consumer.join()
您可以将哨兵对象放入队列中以发出任务结束信号,从而导致所有消费者终止:
_sentinel = object()
def producer(queue):
while running:
# produce some data
queue.put(data)
queue.put(_sentinel)
def consumer(queue):
while True:
data = queue.get()
if data is _sentinel:
# put it back so that other consumers see it
queue.put(_sentinel)
break
# Process data
这段代码无耻地从 Python Cookbook 12.3 中复制过来。
- 使用
_sentinel
标记队列结束。 None
也适用于生产者生成的任务不是 None
的情况,但对于更一般的情况,使用 _sentinel
更安全。
- 您不需要为每个消费者将多个结束标记放入队列。您可能不知道有多少线程正在消耗。当消费者发现它时,只需将哨兵放回队列中,以便其他消费者获得信号。
从您的代码中观察到,您的 consumer
将继续从队列中寻找一些东西,理想情况下您应该通过保留一些 timeout
和处理 Empty
异常如下所示,理想情况下,这有助于检查每个 timeout
.
的 while self.running or not queue.empty()
while self.running or not queue.empty():
try:
product = queue.get(timeout=1)
except Empty:
pass
time.sleep(several_seconds) ###
consume(product)
我确实模拟了你的情况并创建了 producer
和 consumer
线程,下面是示例代码 运行 2 producers
和 4 consumers
它运作良好。希望对你有帮助!
import time
import threading
from Queue import Queue, Empty
"""A multi-producer, multi-consumer queue."""
# A thread that produces data
class Producer(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name,
verbose=verbose)
self.running = True
self.name = name
self.args = args
self.kwargs = kwargs
def run(self):
out_q = self.kwargs.get('queue')
while self.running:
# Adding some integer
out_q.put(10)
# Kepping this thread in sleep not to do many iterations
time.sleep(0.1)
print 'producer {name} terminated\n'.format(name=self.name)
# A thread that consumes data
class Consumer(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name,
verbose=verbose)
self.args = args
self.kwargs = kwargs
self.producer_alive = True
self.name = name
def run(self):
in_q = self.kwargs.get('queue')
# Consumer should die one queue is producer si dead and queue is empty.
while self.producer_alive or not in_q.empty():
try:
data = in_q.get(timeout=1)
except Empty, e:
pass
# This part you can do anything to consume time
if isinstance(data, int):
# just doing some work, infact you can make this one sleep
for i in xrange(data + 10**6):
pass
else:
pass
print 'Consumer {name} terminated (Is producer alive={pstatus}, Is Queue empty={qstatus})!\n'.format(
name=self.name, pstatus=self.producer_alive, qstatus=in_q.empty())
# Create the shared queue and launch both thread pools
q = Queue()
producer_pool, consumer_pool = [], []
for i in range(1, 3):
producer_worker = Producer(kwargs={'queue': q}, name=str(i))
producer_pool.append(producer_worker)
producer_worker.start()
for i in xrange(1, 5):
consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
consumer_pool.append(consumer_worker)
consumer_worker.start()
while 1:
control_process = raw_input('> Y/N: ')
if control_process == 'Y':
for producer in producer_pool:
producer.running = False
# Joining this to make sure all the producers die
producer.join()
for consumer in consumer_pool:
# Ideally consumer should stop once producers die
consumer.producer_alive = False
break
我有一个 Producer 和一个 Consumer 线程 (threading.Thread
),它们共享一个 queue
输入 Queue
.
制作人run
:
while self.running:
product = produced() ### I/O operations
queue.put(product)
消费者run
:
while self.running or not queue.empty():
product = queue.get()
time.sleep(several_seconds) ###
consume(product)
现在我需要从主线程终止两个线程,要求 queue
在终止之前必须为空(全部消耗)。
目前我正在使用如下代码来终止这两个线程:
主线程stop
:
producer.running = False
producer.join()
consumer.running = False
consumer.join()
不过我估计消费者多了就不安全了
此外,我不确定sleep
是否会向制作方发布时间表,以便制作更多产品。事实上,我发现生产者保留 "starving" 但我不确定这是否是根本原因。
有什么好的处理方法吗?
编辑 2:
a) 您的消费者持续花费如此多时间的原因是因为即使您没有数据,您的循环也会持续运行。
b) 我在底部添加了代码,展示了如何处理这个问题。
如果我没理解错的话,producer/consumer 是一个连续的过程,例如可以延迟关闭,直到您退出当前阻塞 I/O 并处理您从中收到的数据。
在那种情况下,为了以有序的方式关闭您的生产者和消费者,我会添加从主线程到生产者线程的通信以调用关闭。在最一般的情况下,这可能是一个队列,主线程可以使用它来排队 "shutdown" 代码,但在单个生产者将被停止并且永远不会重新启动的简单情况下,它可能只是一个全局关闭标志。
您的生产者应在其开始阻塞 I/O 操作之前(例如,在您完成向消费者队列发送其他数据之后)在其主循环中检查此关闭条件(队列或标志)。如果设置了标志,那么它应该在队列中放置一个特殊的数据结束代码(看起来不像你的正常数据)来告诉消费者正在关闭,然后生产者应该 return(自行终止)。
消费者应该被修改为在从队列中取出数据时检查这个数据结束代码。如果找到数据结束代码,它应该按顺序关闭并 return(自行终止)。
如果有多个消费者,那么生产者可以在关闭之前排队多条数据结束消息——每个消费者一条消息。由于消费者阅读消息后停止消费,最终都会全部关闭。
或者,如果您事先不知道有多少消费者,则消费者有序关闭的一部分可能是重新排队数据结束代码。
这将确保所有消费者最终看到数据结束代码并关闭,当所有完成后,队列中将剩下一项——数据结束代码排队最后一个消费者。
编辑:
表示数据结束代码的正确方法高度依赖于应用程序,但在许多情况下,简单的 None
就可以很好地工作。由于 None
是单例,消费者可以使用非常有效的 if data is None
构造来处理最终情况。
另一种在某些情况下甚至更有效的可能性是在 try /except
外部 您的主要消费者循环中设置一个 try /except
发生了,这是因为您试图以一种始终有效的方式解压缩数据,除非您正在处理数据结束代码。
编辑 2:
将这些概念与您的初始代码相结合,现在制作人这样做:
while self.running:
product = produced() ### I/O operations
queue.put(product)
for x in range(number_of_consumers):
queue.put(None) # Termination code
每个消费者都这样做:
while 1:
product = queue.get()
if product is None:
break
consume(product)
然后主程序就可以这样做:
producer.running = False
producer.join()
for consumer in consumers:
consumer.join()
您可以将哨兵对象放入队列中以发出任务结束信号,从而导致所有消费者终止:
_sentinel = object()
def producer(queue):
while running:
# produce some data
queue.put(data)
queue.put(_sentinel)
def consumer(queue):
while True:
data = queue.get()
if data is _sentinel:
# put it back so that other consumers see it
queue.put(_sentinel)
break
# Process data
这段代码无耻地从 Python Cookbook 12.3 中复制过来。
- 使用
_sentinel
标记队列结束。None
也适用于生产者生成的任务不是None
的情况,但对于更一般的情况,使用_sentinel
更安全。 - 您不需要为每个消费者将多个结束标记放入队列。您可能不知道有多少线程正在消耗。当消费者发现它时,只需将哨兵放回队列中,以便其他消费者获得信号。
从您的代码中观察到,您的 consumer
将继续从队列中寻找一些东西,理想情况下您应该通过保留一些 timeout
和处理 Empty
异常如下所示,理想情况下,这有助于检查每个 timeout
.
while self.running or not queue.empty()
while self.running or not queue.empty():
try:
product = queue.get(timeout=1)
except Empty:
pass
time.sleep(several_seconds) ###
consume(product)
我确实模拟了你的情况并创建了 producer
和 consumer
线程,下面是示例代码 运行 2 producers
和 4 consumers
它运作良好。希望对你有帮助!
import time
import threading
from Queue import Queue, Empty
"""A multi-producer, multi-consumer queue."""
# A thread that produces data
class Producer(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name,
verbose=verbose)
self.running = True
self.name = name
self.args = args
self.kwargs = kwargs
def run(self):
out_q = self.kwargs.get('queue')
while self.running:
# Adding some integer
out_q.put(10)
# Kepping this thread in sleep not to do many iterations
time.sleep(0.1)
print 'producer {name} terminated\n'.format(name=self.name)
# A thread that consumes data
class Consumer(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name,
verbose=verbose)
self.args = args
self.kwargs = kwargs
self.producer_alive = True
self.name = name
def run(self):
in_q = self.kwargs.get('queue')
# Consumer should die one queue is producer si dead and queue is empty.
while self.producer_alive or not in_q.empty():
try:
data = in_q.get(timeout=1)
except Empty, e:
pass
# This part you can do anything to consume time
if isinstance(data, int):
# just doing some work, infact you can make this one sleep
for i in xrange(data + 10**6):
pass
else:
pass
print 'Consumer {name} terminated (Is producer alive={pstatus}, Is Queue empty={qstatus})!\n'.format(
name=self.name, pstatus=self.producer_alive, qstatus=in_q.empty())
# Create the shared queue and launch both thread pools
q = Queue()
producer_pool, consumer_pool = [], []
for i in range(1, 3):
producer_worker = Producer(kwargs={'queue': q}, name=str(i))
producer_pool.append(producer_worker)
producer_worker.start()
for i in xrange(1, 5):
consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
consumer_pool.append(consumer_worker)
consumer_worker.start()
while 1:
control_process = raw_input('> Y/N: ')
if control_process == 'Y':
for producer in producer_pool:
producer.running = False
# Joining this to make sure all the producers die
producer.join()
for consumer in consumer_pool:
# Ideally consumer should stop once producers die
consumer.producer_alive = False
break