如何优雅地终止使用 queue.Queue 的多线程 Python 应用程序

How to gracefully terminate a multithreaded Python application that uses queue.Queue

我已经尝试让我的应用程序正常终止很长一段时间了,但到目前为止 none 我找到的答案是有效的。

下面的示例代码说明了我的应用程序的结构。它基本上是一个线程链,使用队列将数据相互传递。

from abc import abstractmethod
from time import sleep
from threading import Thread, Event
from queue import Queue
import signal
import sys


class StoppableThread(Thread):

    def __init__(self):
        super().__init__()
        self.stopper = Event()
        self.queue = Queue()

    @abstractmethod
    def actual_job(self):
        pass

    def stop_running(self):
        self.stopper.set()


    def run(self):
        while not self.stopper.is_set():
            print(self.stopper.is_set())
            self.actual_job()
        self.queue.join()

class SomeObjectOne(StoppableThread):
    def __init__(self, name, some_object_two):
        super().__init__()
        self.name = name
        self.obj_two = some_object_two

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        input_string = 'some string'
        print('{} outputs {}'.format(self.name, input_string))
        self.obj_two.queue.put(input_string)
        sleep(2)

class SomeObjectTwo(StoppableThread):
    def __init__(self, name, some_object_three):
        super().__init__()
        self.name = name
        self.some_object_three = some_object_three


    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        inverted = some_string[::-1]
        print('{} outputs {}'.format(self.name , inverted))
        self.some_object_three.queue.put(inverted)
        sleep(2)


class SomeObjectThree(StoppableThread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def actual_job(self):
        print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        print('{} outputs {}'.format(self.name ,some_string[::-1]))
        sleep(2)




class ServiceExit(Exception):
    """
    Custom exception which is used to trigger the clean exit
    of all running threads and the main program.
    """
    pass

def service_shutdown(signum, frame):
    print('Caught signal %d' % signum)
    raise ServiceExit

signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)

if __name__ == '__main__':
    thread_three = SomeObjectThree('SomeObjectThree')
    thread_two = SomeObjectTwo('SomeObjectTwo', thread_three)
    thread_one = SomeObjectOne('SomeObjectOne', thread_two)

    try:
        thread_three.start()
        thread_two.start()
        thread_one.start()

        # Keep the main thread running, otherwise signals are ignored.
        while True:
            sleep(0.5)

    except ServiceExit:
        print('Running service exit')
        thread_three.stop_running()
        thread_two.stop_running()
        thread_one.stop_running()
        thread_one.join()
        thread_two.join()
        thread_three.join()
        sys.exit(0)

现在,如果我 运行 此代码和 ctrl-C 终止,thread_one 似乎按预期加入,但代码卡在 thread_two.join()

因为thread_one是唯一一个连续空队列的线程,估计跟队列有关系

有什么想法吗?

StoppableThreadrun() 方法中你有这个:

self.queue.join()

join() is a blocking method:

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

所以为了join()到return,仅仅get()另一个线程中的项目是不够的,你还必须表明它已经用[=17=处理了]:

from abc import abstractmethod
from time import sleep
from threading import Thread, Event
from queue import Queue
import signal
import sys

class StoppableThread(Thread):

    def __init__(self):
        super().__init__()
        self.stopper = Event()
        self.queue = Queue()

    @abstractmethod
    def actual_job(self):
        pass

    def stop_running(self):
        self.stopper.set()

    def run(self):
        while not self.stopper.is_set():
            print(self.stopper.is_set())
            self.actual_job()
        self.queue.join()

class SomeObjectOne(StoppableThread):
    def __init__(self, name, some_object_two):
        super().__init__()
        self.name = name
        self.obj_two = some_object_two

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        input_string = 'some string'
        print('{} outputs {}'.format(self.name, input_string))
        self.obj_two.queue.put(input_string)
        sleep(2)

class SomeObjectTwo(StoppableThread):
    def __init__(self, name, some_object_three):
        super().__init__()
        self.name = name
        self.some_object_three = some_object_three

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        inverted = some_string[::-1]
        print('{} outputs {}'.format(self.name , inverted))
        self.queue.task_done()
        self.some_object_three.queue.put(inverted)
        sleep(2)

class SomeObjectThree(StoppableThread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def actual_job(self):
        print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        print('{} outputs {}'.format(self.name ,some_string[::-1]))
        self.queue.task_done()
        sleep(2)

class ServiceExit(Exception):
    """
    Custom exception which is used to trigger the clean exit
    of all running threads and the main program.
    """
    pass

def service_shutdown(signum, frame):
    print('Caught signal %d' % signum)
    raise ServiceExit

signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)

if __name__ == '__main__':
    thread_three = SomeObjectThree('SomeObjectThree')
    thread_two = SomeObjectTwo('SomeObjectTwo', thread_three)
    thread_one = SomeObjectOne('SomeObjectOne', thread_two)

    try:
        thread_three.start()
        thread_two.start()
        thread_one.start()

        # Keep the main thread running, otherwise signals are ignored.
        while True:
            sleep(0.5)

    except ServiceExit:
        print('Running service exit')
        thread_three.stop_running()
        thread_two.stop_running()
        thread_one.stop_running()
        thread_one.join()
        thread_two.join()
        thread_three.join()