中断所有 asyncio.sleep 当前正在执行的
Interrupt all asyncio.sleep currently executing
哪里
这是在 Linux、Python 3.5.1.
什么
我正在用 asyncio
开发一个监控进程,其任务在不同的地方 await
在 asyncio.sleep
不同持续时间的调用上。
有时我希望能够中断所有上述 asyncio.sleep
调用并让所有任务正常进行,但我找不到如何做到这一点。一个示例是用于监控进程的正常关闭。
如何(假设失败)
我以为我可以发送一个 ALRM 信号来达到那个效果,但是这个过程死了。我尝试通过以下方式捕获 ALRM 信号:
def sigalrm_sent(signum, frame):
tse.logger.info("got SIGALRM")
signal.signal(signal.SIGALRM, sigalrm_sent)
然后我得到关于捕获 SIGALRM 的日志行,但是 asyncio.sleep
调用没有被中断。
如何(拼凑)
此时,我将所有 asyncio.sleep
调用替换为对该协程的调用:
async def interruptible_sleep(seconds):
while seconds > 0 and not tse.stop_requested:
duration = min(seconds, tse.TIME_QUANTUM)
await asyncio.sleep(duration)
seconds -= duration
所以我只需要挑一个不太小也不太大的TIME_QUANTUM
就可以了
但是
有没有办法打断所有 运行 asyncio.sleep
呼叫而我错过了?
中断 asyncio.sleep
的所有 运行 调用似乎有点危险,因为它可以用于代码的其他部分,用于其他目的。相反,我会制作一个专用的 sleep
协程来跟踪它的 运行 调用。然后可以通过取消相应的任务来中断它们:
def make_sleep():
async def sleep(delay, result=None, *, loop=None):
coro = asyncio.sleep(delay, result=result, loop=loop)
task = asyncio.ensure_future(coro)
sleep.tasks.add(task)
try:
return await task
except asyncio.CancelledError:
return result
finally:
sleep.tasks.remove(task)
sleep.tasks = set()
sleep.cancel_all = lambda: sum(task.cancel() for task in sleep.tasks)
return sleep
示例:
async def main(sleep, loop):
for i in range(10):
loop.create_task(sleep(i))
await sleep(3)
nb_cancelled = sleep.cancel_all()
await asyncio.wait(sleep.tasks)
return nb_cancelled
sleep = make_sleep()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(sleep, loop))
print(result) # Print '6'
出于调试目的,loop.time = lambda: float('inf')
也可以。
根据 Vincent 的回答,我使用了以下 class(class 的每个实例都可以取消其所有 运行 .sleep
任务,以便更好地划分):
class Sleeper:
"Group sleep calls allowing instant cancellation of all"
def __init__(self, loop):
self.loop = loop
self.tasks = set()
async def sleep(self, delay, result=None):
coro = aio.sleep(delay, result=result, loop=self.loop)
task = aio.ensure_future(coro)
self.tasks.add(task)
try:
return await task
except aio.CancelledError:
return result
finally:
self.tasks.remove(task)
def cancel_all_helper(self):
"Cancel all pending sleep tasks"
cancelled = set()
for task in self.tasks:
if task.cancel():
cancelled.add(task)
return cancelled
async def cancel_all(self):
"Coroutine cancelling tasks"
cancelled = self.cancel_all_helper()
await aio.wait(self.tasks)
self.tasks -= cancelled
return len(cancelled)
哪里
这是在 Linux、Python 3.5.1.
什么
我正在用 asyncio
开发一个监控进程,其任务在不同的地方 await
在 asyncio.sleep
不同持续时间的调用上。
有时我希望能够中断所有上述 asyncio.sleep
调用并让所有任务正常进行,但我找不到如何做到这一点。一个示例是用于监控进程的正常关闭。
如何(假设失败)
我以为我可以发送一个 ALRM 信号来达到那个效果,但是这个过程死了。我尝试通过以下方式捕获 ALRM 信号:
def sigalrm_sent(signum, frame):
tse.logger.info("got SIGALRM")
signal.signal(signal.SIGALRM, sigalrm_sent)
然后我得到关于捕获 SIGALRM 的日志行,但是 asyncio.sleep
调用没有被中断。
如何(拼凑)
此时,我将所有 asyncio.sleep
调用替换为对该协程的调用:
async def interruptible_sleep(seconds):
while seconds > 0 and not tse.stop_requested:
duration = min(seconds, tse.TIME_QUANTUM)
await asyncio.sleep(duration)
seconds -= duration
所以我只需要挑一个不太小也不太大的TIME_QUANTUM
就可以了
但是
有没有办法打断所有 运行 asyncio.sleep
呼叫而我错过了?
中断 asyncio.sleep
的所有 运行 调用似乎有点危险,因为它可以用于代码的其他部分,用于其他目的。相反,我会制作一个专用的 sleep
协程来跟踪它的 运行 调用。然后可以通过取消相应的任务来中断它们:
def make_sleep():
async def sleep(delay, result=None, *, loop=None):
coro = asyncio.sleep(delay, result=result, loop=loop)
task = asyncio.ensure_future(coro)
sleep.tasks.add(task)
try:
return await task
except asyncio.CancelledError:
return result
finally:
sleep.tasks.remove(task)
sleep.tasks = set()
sleep.cancel_all = lambda: sum(task.cancel() for task in sleep.tasks)
return sleep
示例:
async def main(sleep, loop):
for i in range(10):
loop.create_task(sleep(i))
await sleep(3)
nb_cancelled = sleep.cancel_all()
await asyncio.wait(sleep.tasks)
return nb_cancelled
sleep = make_sleep()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(sleep, loop))
print(result) # Print '6'
出于调试目的,loop.time = lambda: float('inf')
也可以。
根据 Vincent 的回答,我使用了以下 class(class 的每个实例都可以取消其所有 运行 .sleep
任务,以便更好地划分):
class Sleeper:
"Group sleep calls allowing instant cancellation of all"
def __init__(self, loop):
self.loop = loop
self.tasks = set()
async def sleep(self, delay, result=None):
coro = aio.sleep(delay, result=result, loop=self.loop)
task = aio.ensure_future(coro)
self.tasks.add(task)
try:
return await task
except aio.CancelledError:
return result
finally:
self.tasks.remove(task)
def cancel_all_helper(self):
"Cancel all pending sleep tasks"
cancelled = set()
for task in self.tasks:
if task.cancel():
cancelled.add(task)
return cancelled
async def cancel_all(self):
"Coroutine cancelling tasks"
cancelled = self.cancel_all_helper()
await aio.wait(self.tasks)
self.tasks -= cancelled
return len(cancelled)