生产者-消费者为什么不停止?

Why doesn't the producer-consumer stop?

我找到了一个用两个线程表示生产者-消费者的例子。但是,当我向进程发送停止信号时,它不会。它期待第二个信号,例如SIGKILL 彻底停止。我认为问题出在 task_done() 但似乎不是。

import time

import queue
import threading
import random


class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        """
        Thread run method. Append random integers to the integers
        list at random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.queue.put(integer)
            print('%d put to queue by %s' % (integer, self.name))
            time.sleep(1)


class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param integers list of integers
        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            integer = self.queue.get()
            print('%d popped from list by %s' % (integer, self.name))
            self.queue.task_done()


def main():
    q = queue.Queue()
    t1 = Producer(q)
    t2 = Consumer(q)
    t1.start()
    t2.start()
    t1.join()
    t2.join()


if __name__ == '__main__':
    main()

输出:

210 put to queue by Thread-1
210 popped from list by Thread-2
Traceback (most recent call last):
  File "/Users/abc/PycharmProjects/untitled1/ssid.py", line 74, in <module>
    main()
  File "/Users/abc/PycharmProjects/untitled1/ssid.py", line 69, in main
    t1.join()
  File "/usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
244 put to queue by Thread-1
244 popped from list by Thread-2
85 put to queue by Thread-1
85 popped from list by Thread-2
160 put to queue by Thread-1
160 popped from list by Thread-2

这是因为只有主线程被 KeyboardInterrupt 停止了。你可以让你的子线程打印 threading.enumerate() which returns all alive threads + the main thread.

import time
import queue
import threading
import random


class Producer(threading.Thread):

    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            integer = random.randint(0, 256)
            self.queue.put(integer)
            print(f'{integer} put to queue by {self.name} '
                  f'threads: {threading.enumerate()}')
            time.sleep(1)


class Consumer(threading.Thread):

    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            integer = self.queue.get()
            print(f'{integer} popped from list by {self.name} '
                  f'threads:{threading.enumerate()}')
            self.queue.task_done()


def main():
    q = queue.Queue()
    t1 = Producer(q)
    t2 = Consumer(q)
    # t1.daemon = True
    # t2.daemon = True
    t1.start()
    t2.start()
    t1.join()
    t2.join()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('got KeyboardInterrupt')

使用键盘中断的示例输出。请注意 MainThread 在 KeyboardInterrupt:

之后列为 'stopped'
97 put to queue by Thread-1 threads: [<_MainThread(MainThread, started 
139810293606208)>, <Producer(Thread-1, started 139810250913536)>, 
<Consumer(Thread-2, started 139810242520832)>]
97 popped from list by Thread-2 threads:[<_MainThread(MainThread, started 
139810293606208)>, <Producer(Thread-1, started 139810250913536)>, 
<Consumer(Thread-2, started 139810242520832)>]
got KeyboardInterrupt
92 put to queue by Thread-1 threads: [<_MainThread(MainThread, stopped 
139810293606208)>, <Producer(Thread-1, started 139810250913536)>, 
<Consumer(Thread-2, started 139810242520832)>]
92 popped from list by Thread-2 threads:[<_MainThread(MainThread, stopped 
139810293606208)>, <Producer(Thread-1, started 139810250913536)>, 
<Consumer(Thread-2, started 139810242520832)>]

您可以让子线程守护进程让它们与主线程一起退出。但这应该只在你的线程没有任何资源的情况下考虑:

Note Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event docs.

更好的方法是像上面的代码一样捕获 KeyboardInterrupt 并通过队列向子线程发送一个哨兵值,让他们知道他们应该完成,允许他们做清洁 -在退出前上升。