Python 中的可中止睡眠 ()
abortable sleep() in Python
我需要一个可以中止的 sleep()
方法(如 here or here 所述)。
我的方法是让 threading.Event.wait()
在指定的持续时间内超时:
def abortable_sleep(secs, abort_event):
abort_event.wait(timeout=secs)
abort_event.clear()
调用 abortable_sleep(10, _abort)
后,我现在可以(从另一个线程)调用 _event.set(_abort)
让 abortable_sleep()
在 10 秒前终止。
示例:
def sleeping_thread():
_start = time.perf_counter()
print("%f thread started" % (time.perf_counter() - _start))
abortable_sleep(5, _abort)
print("%f thread stopped" % (time.perf_counter() - _start))
if __name__ == '__main__':
_abort = threading.Event()
while True:
threading.Thread(target=sleeping_thread).start()
time.sleep(3)
_abort.set()
time.sleep(1)
输出:
0.000001 thread started
3.002668 thread stopped
0.000002 thread started
3.003014 thread stopped
0.000001 thread started
3.002928 thread stopped
0.000001 thread started
此代码按预期工作,但我仍有一些问题:
- 没有 更简单 的方法来获得 s.th。喜欢
sleep()
哪个可以中止?
- 这样可以更优雅吗?例如。这样我必须小心未绑定到
abortable_sleep()
实例的 Event
实例
- 我是否必须期待 性能问题 像
while True: abortable_sleep(0.0001)
这样的高频循环? wait()-超时是如何实现的?
我有一个包装器 class,它基本上在 Event
之上添加了一些睡眠语义。好处是你只需要传递一个 Sleep
对象,如果你愿意,你可以多次调用 sleep()
(虽然 sleep()
不是线程安全的)并且你可以wake()
来自另一个线程。
from threading import Event
class Sleep(object):
def __init__(self, seconds, immediate=True):
self.seconds = seconds
self.event = Event()
if immediate:
self.sleep()
def sleep(self, seconds=None):
if seconds is None:
seconds = self.seconds
self.event.clear()
self.event.wait(timeout=seconds)
def wake(self):
self.event.set()
用法示例:
if __name__ == '__main__':
from threading import Thread
import time
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(created)d - %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info("sleep")
s = Sleep(3)
logger.info("awake")
def wake_it(sleeper):
time.sleep(1)
logger.info("wakeup!")
sleeper.wake()
logger.info("sleeping again")
s = Sleep(60, immediate=False)
Thread(target=wake_it, args=[s]).start()
s.sleep()
logger.info("awake again")
上面的输出可能是这样的:
1423750549 - sleep
1423750552 - awake
1423750552 - sleeping again
1423750553 - wakeup!
1423750553 - awake again
正是您所做的,但封装在 class.
中
由于竞争条件,您的解决方案并不总是完全正确。您应该改用 threading.BoundedSemaphore()
。创建后立即调用aquire()
。当你想睡觉时,调用 acquire()
超时,然后调用 release()
如果 acquire()
返回 true。要提前中止睡眠,请从不同的线程调用 release()
;如果没有正在进行的睡眠,这将提高 ValueError
。
如果另一个线程在错误的时间调用 set()
(即在您实际等待事件的任何时间以外的任何时间),则使用事件代替是有问题的。
我会将 sleep/abort 函数包装在一个新的 class 中:
class AbortableSleep():
def __init__(self):
self._condition = threading.Condition()
def __call__(self, secs):
with self._condition:
self._aborted = False
self._condition.wait(timeout=secs)
return not self._aborted
def abort(self):
with self._condition:
self._condition.notify()
self._aborted = True
然后我还会提供一个 Thread
subclass 来管理基于每个线程的唤醒例程的共享:
class ThreadWithWakeup(threading.Thread):
def __init__(self, *args, **kwargs):
self.abortable_sleep = AbortableSleep()
super(ThreadWithWakeup, self).__init__(*args, **kwargs)
def wakeup(self):
self.abortable_sleep.abort()
任何其他有权访问此线程的线程都可以调用 wakeup()
来中止当前 abortable_sleep()
(如果正在进行)。
使用 ThreadWithWakeup
您可以使用 ThreadWithWakeup
class 创建线程,并像这样使用它:
class MyThread(ThreadWithWakeup):
def run(self):
print "Sleeper: sleeping for 10"
if self.abortable_sleep(10):
print "Sleeper: awoke naturally"
else:
print "Sleeper: rudely awoken"
t = MyThread()
t.start()
print "Main: sleeping for 5"
for i in range(5):
time.sleep(1)
print i + 1
print "Main: waking thread"
t.wakeup()
其输出如下所示:
Sleeper: sleeping for 10
Main: sleeping for 5
1
2
3
4
5
Main: waking thread
Sleeper: rudely awoken
单独使用 AbortableSleep
您也可以单独使用 AbortableSleep
class,如果您由于某种原因不能使用 ThreadWithWakeup
class(也许您'在主线程中,也许其他东西会为您创建线程,等等):
abortable_sleep = AbortableSleep()
def run():
print "Sleeper: sleeping for 10"
if abortable_sleep(10):
print "Sleeper: awoke naturally"
else:
print "Sleeper: rudely awoken"
threading.Thread(target=run).start()
print "Main: sleeping for 5"
for i in range(5):
time.sleep(1)
print i + 1
print "Main: aborting"
abortable_sleep.abort()
我需要一个可以中止的 sleep()
方法(如 here or here 所述)。
我的方法是让 threading.Event.wait()
在指定的持续时间内超时:
def abortable_sleep(secs, abort_event):
abort_event.wait(timeout=secs)
abort_event.clear()
调用 abortable_sleep(10, _abort)
后,我现在可以(从另一个线程)调用 _event.set(_abort)
让 abortable_sleep()
在 10 秒前终止。
示例:
def sleeping_thread():
_start = time.perf_counter()
print("%f thread started" % (time.perf_counter() - _start))
abortable_sleep(5, _abort)
print("%f thread stopped" % (time.perf_counter() - _start))
if __name__ == '__main__':
_abort = threading.Event()
while True:
threading.Thread(target=sleeping_thread).start()
time.sleep(3)
_abort.set()
time.sleep(1)
输出:
0.000001 thread started
3.002668 thread stopped
0.000002 thread started
3.003014 thread stopped
0.000001 thread started
3.002928 thread stopped
0.000001 thread started
此代码按预期工作,但我仍有一些问题:
- 没有 更简单 的方法来获得 s.th。喜欢
sleep()
哪个可以中止? - 这样可以更优雅吗?例如。这样我必须小心未绑定到
abortable_sleep()
实例的 - 我是否必须期待 性能问题 像
while True: abortable_sleep(0.0001)
这样的高频循环? wait()-超时是如何实现的?
Event
实例
我有一个包装器 class,它基本上在 Event
之上添加了一些睡眠语义。好处是你只需要传递一个 Sleep
对象,如果你愿意,你可以多次调用 sleep()
(虽然 sleep()
不是线程安全的)并且你可以wake()
来自另一个线程。
from threading import Event
class Sleep(object):
def __init__(self, seconds, immediate=True):
self.seconds = seconds
self.event = Event()
if immediate:
self.sleep()
def sleep(self, seconds=None):
if seconds is None:
seconds = self.seconds
self.event.clear()
self.event.wait(timeout=seconds)
def wake(self):
self.event.set()
用法示例:
if __name__ == '__main__':
from threading import Thread
import time
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(created)d - %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info("sleep")
s = Sleep(3)
logger.info("awake")
def wake_it(sleeper):
time.sleep(1)
logger.info("wakeup!")
sleeper.wake()
logger.info("sleeping again")
s = Sleep(60, immediate=False)
Thread(target=wake_it, args=[s]).start()
s.sleep()
logger.info("awake again")
上面的输出可能是这样的:
1423750549 - sleep
1423750552 - awake
1423750552 - sleeping again
1423750553 - wakeup!
1423750553 - awake again
正是您所做的,但封装在 class.
中由于竞争条件,您的解决方案并不总是完全正确。您应该改用 threading.BoundedSemaphore()
。创建后立即调用aquire()
。当你想睡觉时,调用 acquire()
超时,然后调用 release()
如果 acquire()
返回 true。要提前中止睡眠,请从不同的线程调用 release()
;如果没有正在进行的睡眠,这将提高 ValueError
。
如果另一个线程在错误的时间调用 set()
(即在您实际等待事件的任何时间以外的任何时间),则使用事件代替是有问题的。
我会将 sleep/abort 函数包装在一个新的 class 中:
class AbortableSleep():
def __init__(self):
self._condition = threading.Condition()
def __call__(self, secs):
with self._condition:
self._aborted = False
self._condition.wait(timeout=secs)
return not self._aborted
def abort(self):
with self._condition:
self._condition.notify()
self._aborted = True
然后我还会提供一个 Thread
subclass 来管理基于每个线程的唤醒例程的共享:
class ThreadWithWakeup(threading.Thread):
def __init__(self, *args, **kwargs):
self.abortable_sleep = AbortableSleep()
super(ThreadWithWakeup, self).__init__(*args, **kwargs)
def wakeup(self):
self.abortable_sleep.abort()
任何其他有权访问此线程的线程都可以调用 wakeup()
来中止当前 abortable_sleep()
(如果正在进行)。
使用 ThreadWithWakeup
您可以使用 ThreadWithWakeup
class 创建线程,并像这样使用它:
class MyThread(ThreadWithWakeup):
def run(self):
print "Sleeper: sleeping for 10"
if self.abortable_sleep(10):
print "Sleeper: awoke naturally"
else:
print "Sleeper: rudely awoken"
t = MyThread()
t.start()
print "Main: sleeping for 5"
for i in range(5):
time.sleep(1)
print i + 1
print "Main: waking thread"
t.wakeup()
其输出如下所示:
Sleeper: sleeping for 10
Main: sleeping for 5
1
2
3
4
5
Main: waking thread
Sleeper: rudely awoken
单独使用 AbortableSleep
您也可以单独使用 AbortableSleep
class,如果您由于某种原因不能使用 ThreadWithWakeup
class(也许您'在主线程中,也许其他东西会为您创建线程,等等):
abortable_sleep = AbortableSleep()
def run():
print "Sleeper: sleeping for 10"
if abortable_sleep(10):
print "Sleeper: awoke naturally"
else:
print "Sleeper: rudely awoken"
threading.Thread(target=run).start()
print "Main: sleeping for 5"
for i in range(5):
time.sleep(1)
print i + 1
print "Main: aborting"
abortable_sleep.abort()