Python 中的可中止睡眠 ()

abortable sleep() in Python

我需要一个可以中止的 sleep() 方法(如 here or here 所述)。

我的方法是让 threading.Event.wait() 在指定的持续时间内超时:

def abortable_sleep(secs, abort_event):
    abort_event.wait(timeout=secs)
    abort_event.clear()

调用 abortable_sleep(10, _abort) 后,我现在可以(从另一个线程)调用 _event.set(_abort)abortable_sleep() 在 10 秒前终止。

示例:

def sleeping_thread():
    _start = time.perf_counter()
    print("%f thread started" % (time.perf_counter() - _start))
    abortable_sleep(5, _abort)
    print("%f thread stopped" % (time.perf_counter() - _start))

if __name__ == '__main__':

    _abort = threading.Event()
    while True:
        threading.Thread(target=sleeping_thread).start()
        time.sleep(3)
        _abort.set()
        time.sleep(1)

输出:

0.000001 thread started
3.002668 thread stopped
0.000002 thread started
3.003014 thread stopped
0.000001 thread started
3.002928 thread stopped
0.000001 thread started

此代码按预期工作,但我仍有一些问题:

我有一个包装器 class,它基本上在 Event 之上添加了一些睡眠语义。好处是你只需要传递一个 Sleep 对象,如果你愿意,你可以多次调用 sleep() (虽然 sleep() 不是线程安全的)并且你可以wake() 来自另一个线程。

from threading import Event

class Sleep(object):
    def __init__(self, seconds, immediate=True):
        self.seconds = seconds
        self.event = Event()
        if immediate:
            self.sleep()

    def sleep(self, seconds=None):
        if seconds is None:
            seconds = self.seconds
        self.event.clear()
        self.event.wait(timeout=seconds)

    def wake(self):
        self.event.set()

用法示例:

if __name__ == '__main__':
    from threading import Thread
    import time
    import logging

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(created)d - %(message)s')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    logger.info("sleep")
    s = Sleep(3)
    logger.info("awake")

    def wake_it(sleeper):
        time.sleep(1)
        logger.info("wakeup!")
        sleeper.wake()

    logger.info("sleeping again")
    s = Sleep(60, immediate=False)
    Thread(target=wake_it, args=[s]).start()
    s.sleep()
    logger.info("awake again")

上面的输出可能是这样的:

1423750549 - sleep
1423750552 - awake
1423750552 - sleeping again
1423750553 - wakeup!
1423750553 - awake again

正是您所做的,但封装在 class.

由于竞争条件,您的解决方案并不总是完全正确。您应该改用 threading.BoundedSemaphore()。创建后立即调用aquire()。当你想睡觉时,调用 acquire() 超时,然后调用 release() 如果 acquire() 返回 true。要提前中止睡眠,请从不同的线程调用 release();如果没有正在进行的睡眠,这将提高 ValueError

如果另一个线程在错误的时间调用 set()(即在您实际等待事件的任何时间以外的任何时间),则使用事件代替是有问题的。

我会将 sleep/abort 函数包装在一个新的 class 中:

class AbortableSleep():
    def __init__(self):
        self._condition = threading.Condition()

    def __call__(self, secs):
        with self._condition:
            self._aborted = False
            self._condition.wait(timeout=secs)
            return not self._aborted

    def abort(self):
        with self._condition:
            self._condition.notify()
            self._aborted = True

然后我还会提供一个 Thread subclass 来管理基于每个线程的唤醒例程的共享:

class ThreadWithWakeup(threading.Thread):
    def __init__(self, *args, **kwargs):
        self.abortable_sleep = AbortableSleep()
        super(ThreadWithWakeup, self).__init__(*args, **kwargs)

    def wakeup(self):
        self.abortable_sleep.abort()

任何其他有权访问此线程的线程都可以调用 wakeup() 来中止当前 abortable_sleep()(如果正在进行)。


使用 ThreadWithWakeup

您可以使用 ThreadWithWakeup class 创建线程,并像这样使用它:

class MyThread(ThreadWithWakeup):
    def run(self):
        print "Sleeper: sleeping for 10"
        if self.abortable_sleep(10):
            print "Sleeper: awoke naturally"
        else:
            print "Sleeper: rudely awoken"

t = MyThread()
t.start()
print "Main: sleeping for 5"
for i in range(5):
    time.sleep(1)
    print i + 1 
print "Main: waking thread"
t.wakeup()

其输出如下所示:

Sleeper: sleeping for 10
Main: sleeping for 5
1
2
3
4
5
Main: waking thread
Sleeper: rudely awoken

单独使用 AbortableSleep

您也可以单独使用 AbortableSleep class,如果您由于某种原因不能使用 ThreadWithWakeup class(也许您'在主线程中,也许其他东西会为您创建线程,等等):

abortable_sleep = AbortableSleep()
def run():
    print "Sleeper: sleeping for 10"
    if abortable_sleep(10):
        print "Sleeper: awoke naturally"
    else:
        print "Sleeper: rudely awoken"
threading.Thread(target=run).start()

print "Main: sleeping for 5"
for i in range(5):
    time.sleep(1)
    print i + 1
print "Main: aborting"
abortable_sleep.abort()