Python - asyncio/coroutine 计时器
Python - Timer with asyncio/coroutine
我正在尝试设置一个计时器,它将中断 运行 进程并在它触发时调用协程。但是,我不确定完成此操作的正确方法是什么。我找到了 AbstractEventLoop.call_later 和 threading.Timer,但这些似乎都不起作用(或者我使用不当)。代码非常基本,看起来像这样:
def set_timer( time ):
self.timer = Timer( 10.0, timeout )
self.timer.start()
#v2
#self.timer = get_event_loop()
#self.timer.call_later( 10.0, timeout )
return
async def timeout():
await some_func()
return
设置非阻塞计时器的正确方法是什么,它会在几秒后调用回调函数?能够取消计时器将是一种奖励,但不是必需的。我需要的主要东西是:非阻塞和成功调用协程。现在它 returns 一个错误,对象不能被等待(如果我抛出一个等待)或者 some_func 从未被等待,并且预期的输出永远不会发生。
正在创建 Task using ensure_future is a common way to start some job executing without blocking your execution flow. You can also cancel 个任务。
我为您编写了示例实现以供您开始:
import asyncio
class Timer:
def __init__(self, timeout, callback):
self._timeout = timeout
self._callback = callback
self._task = asyncio.ensure_future(self._job())
async def _job(self):
await asyncio.sleep(self._timeout)
await self._callback()
def cancel(self):
self._task.cancel()
async def timeout_callback():
await asyncio.sleep(0.1)
print('echo!')
async def main():
print('\nfirst example:')
timer = Timer(2, timeout_callback) # set timer for two seconds
await asyncio.sleep(2.5) # wait to see timer works
print('\nsecond example:')
timer = Timer(2, timeout_callback) # set timer for two seconds
await asyncio.sleep(1)
timer.cancel() # cancel it
await asyncio.sleep(1.5) # and wait to see it won't call callback
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
输出:
first example:
echo!
second example:
感谢 Mikhail Gerasimov 的回答,非常有用。这是 Mikhail 的答案的扩展。这是一个有一些曲折的间隔计时器。也许对某些用户有用。
import asyncio
class Timer:
def __init__(self, interval, first_immediately, timer_name, context, callback):
self._interval = interval
self._first_immediately = first_immediately
self._name = timer_name
self._context = context
self._callback = callback
self._is_first_call = True
self._ok = True
self._task = asyncio.ensure_future(self._job())
print(timer_name + " init done")
async def _job(self):
try:
while self._ok:
if not self._is_first_call or not self._first_immediately:
await asyncio.sleep(self._interval)
await self._callback(self._name, self._context, self)
self._is_first_call = False
except Exception as ex:
print(ex)
def cancel(self):
self._ok = False
self._task.cancel()
async def some_callback(timer_name, context, timer):
context['count'] += 1
print('callback: ' + timer_name + ", count: " + str(context['count']))
if timer_name == 'Timer 2' and context['count'] == 3:
timer.cancel()
print(timer_name + ": goodbye and thanks for all the fish")
timer1 = Timer(interval=1, first_immediately=True, timer_name="Timer 1", context={'count': 0}, callback=some_callback)
timer2 = Timer(interval=5, first_immediately=False, timer_name="Timer 2", context={'count': 0}, callback=some_callback)
try:
loop = asyncio.get_event_loop()
loop.run_forever()
except KeyboardInterrupt:
timer1.cancel()
timer2.cancel()
print("clean up done")
Mikhail 提出的解决方案有一个缺点。调用 cancel()
会同时取消:计时器和实际回调(如果 cancel()
在超时后触发,但实际作业仍在进行中)。取消作业本身可能不是理想的行为。
另一种方法是使用 loop.call_later
:
async def some_job():
print('Job started')
await asyncio.sleep(5)
print('Job is done')
loop = asyncio.get_event_loop() # or asyncio.get_running_loop()
timeout = 5
timer = loop.call_later(timeout, lambda: asyncio.ensure_future(some_job()))
timer.cancel() # cancels the timer, but not the job, if it's already started
我正在尝试设置一个计时器,它将中断 运行 进程并在它触发时调用协程。但是,我不确定完成此操作的正确方法是什么。我找到了 AbstractEventLoop.call_later 和 threading.Timer,但这些似乎都不起作用(或者我使用不当)。代码非常基本,看起来像这样:
def set_timer( time ):
self.timer = Timer( 10.0, timeout )
self.timer.start()
#v2
#self.timer = get_event_loop()
#self.timer.call_later( 10.0, timeout )
return
async def timeout():
await some_func()
return
设置非阻塞计时器的正确方法是什么,它会在几秒后调用回调函数?能够取消计时器将是一种奖励,但不是必需的。我需要的主要东西是:非阻塞和成功调用协程。现在它 returns 一个错误,对象不能被等待(如果我抛出一个等待)或者 some_func 从未被等待,并且预期的输出永远不会发生。
正在创建 Task using ensure_future is a common way to start some job executing without blocking your execution flow. You can also cancel 个任务。
我为您编写了示例实现以供您开始:
import asyncio
class Timer:
def __init__(self, timeout, callback):
self._timeout = timeout
self._callback = callback
self._task = asyncio.ensure_future(self._job())
async def _job(self):
await asyncio.sleep(self._timeout)
await self._callback()
def cancel(self):
self._task.cancel()
async def timeout_callback():
await asyncio.sleep(0.1)
print('echo!')
async def main():
print('\nfirst example:')
timer = Timer(2, timeout_callback) # set timer for two seconds
await asyncio.sleep(2.5) # wait to see timer works
print('\nsecond example:')
timer = Timer(2, timeout_callback) # set timer for two seconds
await asyncio.sleep(1)
timer.cancel() # cancel it
await asyncio.sleep(1.5) # and wait to see it won't call callback
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
输出:
first example:
echo!
second example:
感谢 Mikhail Gerasimov 的回答,非常有用。这是 Mikhail 的答案的扩展。这是一个有一些曲折的间隔计时器。也许对某些用户有用。
import asyncio
class Timer:
def __init__(self, interval, first_immediately, timer_name, context, callback):
self._interval = interval
self._first_immediately = first_immediately
self._name = timer_name
self._context = context
self._callback = callback
self._is_first_call = True
self._ok = True
self._task = asyncio.ensure_future(self._job())
print(timer_name + " init done")
async def _job(self):
try:
while self._ok:
if not self._is_first_call or not self._first_immediately:
await asyncio.sleep(self._interval)
await self._callback(self._name, self._context, self)
self._is_first_call = False
except Exception as ex:
print(ex)
def cancel(self):
self._ok = False
self._task.cancel()
async def some_callback(timer_name, context, timer):
context['count'] += 1
print('callback: ' + timer_name + ", count: " + str(context['count']))
if timer_name == 'Timer 2' and context['count'] == 3:
timer.cancel()
print(timer_name + ": goodbye and thanks for all the fish")
timer1 = Timer(interval=1, first_immediately=True, timer_name="Timer 1", context={'count': 0}, callback=some_callback)
timer2 = Timer(interval=5, first_immediately=False, timer_name="Timer 2", context={'count': 0}, callback=some_callback)
try:
loop = asyncio.get_event_loop()
loop.run_forever()
except KeyboardInterrupt:
timer1.cancel()
timer2.cancel()
print("clean up done")
Mikhail 提出的解决方案有一个缺点。调用 cancel()
会同时取消:计时器和实际回调(如果 cancel()
在超时后触发,但实际作业仍在进行中)。取消作业本身可能不是理想的行为。
另一种方法是使用 loop.call_later
:
async def some_job():
print('Job started')
await asyncio.sleep(5)
print('Job is done')
loop = asyncio.get_event_loop() # or asyncio.get_running_loop()
timeout = 5
timer = loop.call_later(timeout, lambda: asyncio.ensure_future(some_job()))
timer.cancel() # cancels the timer, but not the job, if it's already started