如何在 Python 中从主线程终止生产者-消费者线程?

How to terminate Producer-Consumer threads from main thread in Python?

我有一个 Producer 和一个 Consumer 线程 (threading.Thread),它们共享一个 queue输入 Queue.

制作人run:

while self.running:
    product = produced() ### I/O operations
    queue.put(product)

消费者run

while self.running or not queue.empty():
    product = queue.get()
    time.sleep(several_seconds) ###
    consume(product)

现在我需要从主线程终止两个线程,要求 queue 在终止之前必须为空(全部消耗)。

目前我正在使用如下代码来终止这两个线程:

主线程stop:

producer.running = False
producer.join()
consumer.running = False
consumer.join()

不过我估计消费者多了就不安全了

此外,我不确定sleep是否会向制作方发布时间表,以便制作更多产品。事实上,我发现生产者保留 "starving" 但我不确定这是否是根本原因。

有什么好的处理方法吗?

编辑 2:

a) 您的消费者持续花费如此多时间的原因是因为即使您没有数据,您的循环也会持续运行。

b) 我在底部添加了代码,展示了如何处理这个问题。

如果我没理解错的话,producer/consumer 是一个连续的过程,例如可以延迟关闭,直到您退出当前阻塞 I/O 并处理您从中收到的数据。

在那种情况下,为了以有序的方式关闭您的生产者和消费者,我会添加从主线程到生产者线程的通信以调用关闭。在最一般的情况下,这可能是一个队列,主线程可以使用它来排队 "shutdown" 代码,但在单个生产者将被停止并且永远不会重新启动的简单情况下,它可能只是一个全局关闭标志。

您的生产者应在其开始阻塞 I/O 操作之前(例如,在您完成向消费者队列发送其他数据之后)在其主循环中检查此关闭条件(队列或标志)。如果设置了标志,那么它应该在队列中放置一个特殊的数据结束代码(看起来不像你的正常数据)来告诉消费者正在关闭,然后生产者应该 return(自行终止)。

消费者应该被修改为在从队列中取出数据时检查这个数据结束代码。如果找到数据结束代码,它应该按顺序关闭并 return(自行终止)。

如果有多个消费者,那么生产者可以在关闭之前排队多条数据结束消息——每个消费者一条消息。由于消费者阅读消息后停止消费,最终都会全部关闭。

或者,如果您事先不知道有多少消费者,则消费者有序关闭的一部分可能是重新排队数据结束代码。

这将确保所有消费者最终看到数据结束代码并关闭,当所有完成后,队列中将剩下一项——数据结束代码排队最后一个消费者。

编辑:

表示数据结束代码的正确方法高度依赖于应用程序,但在许多情况下,简单的 None 就可以很好地工作。由于 None 是单例,消费者可以使用非常有效的 if data is None 构造来处理最终情况。

另一种在某些情况下甚至更有效的可能性是在 try /except 外部 您的主要消费者循环中设置一个 try /except 发生了,这是因为您试图以一种始终有效的方式解压缩数据,除非您正在处理数据结束代码。

编辑 2:

将这些概念与您的初始代码相结合,现在制作人这样做:

while self.running:
    product = produced() ### I/O operations
    queue.put(product)
for x in range(number_of_consumers):
    queue.put(None)  # Termination code

每个消费者都这样做:

while 1:
    product = queue.get()
    if product is None:
        break
    consume(product)

然后主程序就可以这样做:

producer.running = False
producer.join()
for consumer in consumers:
    consumer.join()

您可以将哨兵对象放入队列中以发出任务结束信号,从而导致所有消费者终止:

_sentinel = object()

def producer(queue):
    while running:
       # produce some data
       queue.put(data)
    queue.put(_sentinel)

def consumer(queue):
    while True:
        data = queue.get()
        if data is _sentinel:
            # put it back so that other consumers see it
            queue.put(_sentinel)
            break
        # Process data

这段代码无耻地从 Python Cookbook 12.3 中复制过来。

  1. 使用 _sentinel 标记队列结束。 None 也适用于生产者生成的任务不是 None 的情况,但对于更一般的情况,使用 _sentinel 更安全。
  2. 您不需要为每个消费者将多个结束标记放入队列。您可能不知道有多少线程正在消耗。当消费者发现它时,只需将哨兵放回队列中,以便其他消费者获得信号。

从您的代码中观察到,您的 consumer 将继续从队列中寻找一些东西,理想情况下您应该通过保留一些 timeout 和处理 Empty 异常如下所示,理想情况下,这有助于检查每个 timeout.

while self.running or not queue.empty()
while self.running or not queue.empty():
    try:
        product = queue.get(timeout=1)
    except Empty:
        pass
    time.sleep(several_seconds) ###
    consume(product)

我确实模拟了你的情况并创建了 producerconsumer 线程,下面是示例代码 运行 2 producers 和 4 consumers它运作良好。希望对你有帮助!

import time
import threading

from Queue import Queue, Empty

"""A multi-producer, multi-consumer queue."""

# A thread that produces data
class Producer(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.running = True
        self.name = name
        self.args = args
        self.kwargs = kwargs

    def run(self):
        out_q = self.kwargs.get('queue')
        while self.running:
            # Adding some integer
            out_q.put(10)
            # Kepping this thread in sleep not to do many iterations
            time.sleep(0.1)

        print 'producer {name} terminated\n'.format(name=self.name)


# A thread that consumes data
class Consumer(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.args = args
        self.kwargs = kwargs
        self.producer_alive = True
        self.name = name

    def run(self):
        in_q = self.kwargs.get('queue')

        # Consumer should die one queue is producer si dead and queue is empty.
        while self.producer_alive or not in_q.empty():
            try:
                data = in_q.get(timeout=1)
            except Empty, e:
                pass

            # This part you can do anything to consume time
            if isinstance(data, int):
                # just doing some work, infact you can make this one sleep
                for i in xrange(data + 10**6):
                    pass
            else:
                pass
        print 'Consumer {name} terminated (Is producer alive={pstatus}, Is Queue empty={qstatus})!\n'.format(
            name=self.name, pstatus=self.producer_alive, qstatus=in_q.empty())


# Create the shared queue and launch both thread pools
q = Queue()

producer_pool, consumer_pool = [], []


for i in range(1, 3):
    producer_worker = Producer(kwargs={'queue': q}, name=str(i))
    producer_pool.append(producer_worker)
    producer_worker.start()

for i in xrange(1, 5):
    consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
    consumer_pool.append(consumer_worker)
    consumer_worker.start()

while 1:
    control_process = raw_input('> Y/N: ')
    if control_process == 'Y':
        for producer in producer_pool:
            producer.running = False
            # Joining this to make sure all the producers die
            producer.join()

        for consumer in consumer_pool:
            # Ideally consumer should stop once producers die
            consumer.producer_alive = False

        break