涉及线程的生成器中看似未处理的异常

Seemingly unhandled exception in a generator with threads involved

考虑下面的代码。

目的是有一个管理线程的生成器。该线程负责获取数据,推送到大小为 1 的 fifo,生成器从中弹出并产生。

它正常运行,打印出一个不断增加的整数,与close thread set? False交错。

现在我希望 foo 的主线程中的 finally: 块 运行 总是被调用,例如 ctrl-c,在这种情况下 close_thread 将被设置并且线程将退出。

不幸的是,这似乎并没有可靠地发生,当用 ctrl-c 中断时的合理时间比例,主线程停止但获取者线程继续(打印出 close thread set? False 直到我杀死它)。我想知道异常是否没有传递给包装器对象中的生成器。

我是不是漏掉了一些明显的东西?

from threading import Thread, Event
from collections import deque
from time import sleep

def foo():

    # As used, deque is thread safe
    processing_fifo = deque([], maxlen=1)
    close_thread = Event()
    close_thread.clear()

    def acquirer():
        a = 0

        try:
            while True:
                processing_fifo.append(a)
                a += 1

                print 'close thread set?', close_thread.is_set()                
                if close_thread.is_set():
                    break

                sleep(1e-6)

        finally: # Make sure we capture any exceptions in the thread
            close_thread.set()

    try:
        acquirer_thread = Thread(target=acquirer)
        acquirer_thread.start()

        while True:
            try:
                yield processing_fifo.popleft()

            except IndexError:
                if close_thread.is_set():
                    break

            sleep(1e-6)

    finally:
        close_thread.set()
        acquirer_thread.join()


class Bleh(object):

    def __init__(self):

        self.gen = foo()

    def eep(self):

        return self.gen.next()

obj = Bleh()

while True:
    print obj.eep()

编辑 使线程成为守护线程似乎就足够了 (acquirer_thread.daemon = True)。在这种情况下,子线程内的 finally: 块会可靠地执行。不过,这并没有真正让我更深入地了解这个问题。

当您按下 ctrl-c 时,它只会中断主线程,该线程退出但离开您的 Python 进程 运行 因为您生成的线程保持 运行 - 它们确实如此没有收到 KeyboardInterrupt.

因此,如果您不将线程设置为 daemon = true,您的程序将不会退出。 From the documention:

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set through the daemon property.

在这种情况下,当您的线程被设置为守护进程时,当您 ctrl-c 导致您的 except 块执行的程序时,它们也将被迫退出。

尽管您可以将线程设置为 daemon=True,但还有另一种方法。尝试类似的东西:

keep_running = True
def acquirer():
    try:
        while keep_running:
            ...

然后在您的主线程中捕获 KeyboardInterrupt 异常:

try:
    ...
except KeyboardInterrupt:
    # thread will see this change on next iteration and exit
    keep_running = False 

问题是当主线程不在对 obj.eep 的调用中时,有时会发生 KeyboardInterrupt。在 Python 中,信号(就像您执行 Ctrl+C 时发出的 SIGINT)are only delivered to the main thread:

[O]nly the main thread can set a new signal handler, and the main thread will be the only one to receive signals (this is enforced by the Python signal module, even if the underlying thread implementation supports sending signals to individual threads)

如果 SIGINT 发生在 obj.eep 之外,则不会发生异常处理,因此您的事件对象永远不会被设置。如果您在每次调用 obj.eep().

后添加一个短暂的睡眠,您可以使它更一致地发生

如果您将事件对象设为全局对象,并在 while True: obj.eep() 周围添加一个 try/finally,问题就会消失:

from threading import Thread, Event
from collections import deque
from time import sleep

def foo():
    # As used, deque is thread safe
    processing_fifo = deque([], maxlen=1)
    #close_thread = Event()
    close_thread.clear()

    def acquirer():
        a = 0

        try:
            while True:
                processing_fifo.append(a)
                a += 1

                print 'close thread set?', close_thread.is_set()                
                if close_thread.is_set():
                    break

                sleep(1e-6)

        finally: # Make sure we capture any exceptions in the thread
            close_thread.set()

    try:
        acquirer_thread = Thread(target=acquirer)
        acquirer_thread.start()

        while True:
            try:
                yield processing_fifo.popleft()

            except IndexError:
                if close_thread.is_set():
                    break

            sleep(1e-6)

    finally:
        close_thread.set()
        acquirer_thread.join()


class Bleh(object):

    def __init__(self):

        self.gen = foo()

    def eep(self):

        return self.gen.next()

close_thread = Event()
obj = Bleh()

try:
    while True:
        print obj.eep()
finally:
    close_thread.set()