如何使用 asyncio 定期执行函数?
How can I periodically execute a function with asyncio?
我正在从 tornado
迁移到 asyncio
,但找不到与 tornado
的 PeriodicCallback
等效的 asyncio
。 (A PeriodicCallback
有两个参数:运行 的函数和调用之间的毫秒数。)
asyncio
中有这样的等价物吗?
- 如果没有,什么是最干净的实现方式,而不 运行 避免在一段时间后获得
RecursionError
的风险?
没有对定期调用的内置支持,没有。
只需创建您自己的调度程序循环即可休眠并执行任何已安排的任务:
import math, time
async def scheduler():
while True:
# sleep until the next whole second
now = time.time()
await asyncio.sleep(math.ceil(now) - now)
# execute any scheduled tasks
async for task in scheduled_tasks(time.time()):
await task()
scheduled_tasks()
迭代器应生成在给定时间准备好 运行 的任务。请注意,生成时间表并启动所有任务理论上可能需要超过 1 秒的时间;这里的想法是调度程序产生自上次检查以来应该开始的所有任务。
对于低于 3.5 的 Python 版本:
import asyncio
@asyncio.coroutine
def periodic():
while True:
print('periodic')
yield from asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
对于 Python 3.5 及更高版本:
import asyncio
async def periodic():
while True:
print('periodic')
await asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
当您觉得您的 asyncio 程序 "in background" 应该发生某些事情时,asyncio.Task
可能是实现它的好方法。您可以阅读 了解如何处理任务。
下面是 class 的可能实现,它会周期性地执行一些函数:
import asyncio
from contextlib import suppress
class Periodic:
def __init__(self, func, time):
self.func = func
self.time = time
self.is_started = False
self._task = None
async def start(self):
if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
async def stop(self):
if self.is_started:
self.is_started = False
# Stop task and await it stopped:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _run(self):
while True:
await asyncio.sleep(self.time)
self.func()
我们来测试一下:
async def main():
p = Periodic(lambda: print('test'), 1)
try:
print('Start')
await p.start()
await asyncio.sleep(3.1)
print('Stop')
await p.stop()
await asyncio.sleep(3.1)
print('Start')
await p.start()
await asyncio.sleep(3.1)
finally:
await p.stop() # we should stop task finally
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
Start
test
test
test
Stop
Start
test
test
test
[Finished in 9.5s]
正如您在 start
上看到的那样,我们只是开始调用一些函数并在无限循环中休眠一段时间的任务。在 stop
我们只是取消了那个任务。请注意,该任务应在程序完成时停止。
更重要的一点是您的回调不应花费太多时间来执行(否则它会冻结您的事件循环)。如果你打算调用一些 long-运行 func
,你可能需要 .
基于 (@Torkel Bjørnson-Langen 和@ReWrite 评论)这是一项避免漂移的改进。
import time
import asyncio
@asyncio.coroutine
def periodic(period):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * period - time.time(), 0)
g = g_tick()
while True:
print('periodic', time.time())
yield from asyncio.sleep(next(g))
loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
带有装饰器的替代版本 python 3.7
import asyncio
import time
def periodic(period):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
return wrapper
return scheduler
@periodic(2)
async def do_something(*args, **kwargs):
await asyncio.sleep(5) # Do some heavy calculation
print(time.time())
if __name__ == '__main__':
asyncio.run(do_something('Maluzinha do papai!', secret=42))
一个可能有用的变体:如果您希望在上次执行结束和下一次执行开始之间每隔 n 秒而不是 n 秒进行一次重复调用,并且您不希望调用重叠及时,以下更简单:
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
还有一个使用它来 运行 在后台执行几个任务的示例:
async def f():
await asyncio.sleep(1)
print('Hello')
async def g():
await asyncio.sleep(0.5)
print('Goodbye')
async def main():
t1 = asyncio.ensure_future(repeat(3, f))
t2 = asyncio.ensure_future(repeat(2, g))
await t1
await t2
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这就是我用 asyncio 测试我的周期性回调理论所做的。我没有使用 Tornado 的经验,所以我不确定定期回调如何与它一起工作。不过,我习惯于在 Tkinter 中使用 after(ms, callback)
方法,这就是我想出的方法。 While True:
即使它是异步的(比全局更严重),我看起来也很难看。 call_later(s, callback, *args)
方法使用秒而不是毫秒。
import asyncio
my_var = 0
def update_forever(the_loop):
global my_var
print(my_var)
my_var += 1
# exit logic could be placed here
the_loop.call_later(3, update_forever, the_loop) # the method adds a delayed callback on completion
event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()
此解决方案使用来自 Fernando José Esteves de Souza, the drifting workaround from Wojciech Migda 的装饰概念和超级class 以尽可能生成最优雅的代码来处理异步周期函数。
没有threading.Thread
解决方案由以下文件组成:
periodic_async_thread.py
以 class 为底子 class
a_periodic_thread.py
示例 subclass
run_me.py
示例实例化和 运行
文件中的PeriodicAsyncThread
class periodic_async_thread.py
:
import time
import asyncio
import abc
class PeriodicAsyncThread:
def __init__(self, period):
self.period = period
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
@abc.abstractmethod
async def run(self, *args, **kwargs):
return
def start(self):
asyncio.run(self.run())
一个简单的子示例class APeriodicThread
文件 a_periodic_thread.py
:
from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio
class APeriodicThread(PeriodicAsyncThread):
def __init__(self, period):
super().__init__(period)
self.run = self.periodic()(self.run)
async def run(self, *args, **kwargs):
await asyncio.sleep(2)
print(time.time())
在文件 run_me.py
:
中实例化和 运行 实例 class
from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()
此代码代表了一种优雅的解决方案,它还减轻了其他解决方案的时间漂移问题。输出类似于:
1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736
与threading.Thread
解决方案由以下文件组成:
async_thread.py
与 canopy 异步线程 class.
periodic_async_thread.py
以 class 为底子 class
a_periodic_thread.py
示例 subclass
run_me.py
示例实例化和 运行
文件async_thread.py
中的AsyncThread
class:
from threading import Thread
import asyncio
import abc
class AsyncThread(Thread):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@abc.abstractmethod
async def async_run(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(self.async_run(*args, **kwargs))
# loop.close()
asyncio.run(self.async_run(*args, **kwargs))
文件中的PeriodicAsyncThread
class periodic_async_thread.py
:
import time
import asyncio
from .async_thread import AsyncThread
class PeriodicAsyncThread(AsyncThread):
def __init__(self, period, *args, **kwargs):
self.period = period
super().__init__(*args, **kwargs)
self.async_run = self.periodic()(self.async_run)
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
一个简单的子示例class APeriodicThread
文件 a_periodic_thread.py
:
import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio
class APeriodicAsyncTHread(PeriodicAsyncThread):
async def async_run(self, *args, **kwargs):
print(f"{current_thread().name} {time.time()} Hi!")
await asyncio.sleep(1)
print(f"{current_thread().name} {time.time()} Bye!")
在文件 run_me.py
:
中实例化和 运行 实例 class
from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()
此代码代表了一种优雅的解决方案,它还减轻了其他解决方案的时间漂移问题。输出类似于:
a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!
对于多种类型的调度,我推荐 APSScheduler,它支持异步。
我将它用于一个简单的 python 进程,我可以使用 docker 启动它,就像一个 cron 一样每周执行一些事情,直到我杀死 docker/process.
我正在从 tornado
迁移到 asyncio
,但找不到与 tornado
的 PeriodicCallback
等效的 asyncio
。 (A PeriodicCallback
有两个参数:运行 的函数和调用之间的毫秒数。)
asyncio
中有这样的等价物吗?- 如果没有,什么是最干净的实现方式,而不 运行 避免在一段时间后获得
RecursionError
的风险?
没有对定期调用的内置支持,没有。
只需创建您自己的调度程序循环即可休眠并执行任何已安排的任务:
import math, time
async def scheduler():
while True:
# sleep until the next whole second
now = time.time()
await asyncio.sleep(math.ceil(now) - now)
# execute any scheduled tasks
async for task in scheduled_tasks(time.time()):
await task()
scheduled_tasks()
迭代器应生成在给定时间准备好 运行 的任务。请注意,生成时间表并启动所有任务理论上可能需要超过 1 秒的时间;这里的想法是调度程序产生自上次检查以来应该开始的所有任务。
对于低于 3.5 的 Python 版本:
import asyncio
@asyncio.coroutine
def periodic():
while True:
print('periodic')
yield from asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
对于 Python 3.5 及更高版本:
import asyncio
async def periodic():
while True:
print('periodic')
await asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
当您觉得您的 asyncio 程序 "in background" 应该发生某些事情时,asyncio.Task
可能是实现它的好方法。您可以阅读
下面是 class 的可能实现,它会周期性地执行一些函数:
import asyncio
from contextlib import suppress
class Periodic:
def __init__(self, func, time):
self.func = func
self.time = time
self.is_started = False
self._task = None
async def start(self):
if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
async def stop(self):
if self.is_started:
self.is_started = False
# Stop task and await it stopped:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _run(self):
while True:
await asyncio.sleep(self.time)
self.func()
我们来测试一下:
async def main():
p = Periodic(lambda: print('test'), 1)
try:
print('Start')
await p.start()
await asyncio.sleep(3.1)
print('Stop')
await p.stop()
await asyncio.sleep(3.1)
print('Start')
await p.start()
await asyncio.sleep(3.1)
finally:
await p.stop() # we should stop task finally
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
Start
test
test
test
Stop
Start
test
test
test
[Finished in 9.5s]
正如您在 start
上看到的那样,我们只是开始调用一些函数并在无限循环中休眠一段时间的任务。在 stop
我们只是取消了那个任务。请注意,该任务应在程序完成时停止。
更重要的一点是您的回调不应花费太多时间来执行(否则它会冻结您的事件循环)。如果你打算调用一些 long-运行 func
,你可能需要
基于
import time
import asyncio
@asyncio.coroutine
def periodic(period):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * period - time.time(), 0)
g = g_tick()
while True:
print('periodic', time.time())
yield from asyncio.sleep(next(g))
loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
带有装饰器的替代版本 python 3.7
import asyncio
import time
def periodic(period):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
return wrapper
return scheduler
@periodic(2)
async def do_something(*args, **kwargs):
await asyncio.sleep(5) # Do some heavy calculation
print(time.time())
if __name__ == '__main__':
asyncio.run(do_something('Maluzinha do papai!', secret=42))
一个可能有用的变体:如果您希望在上次执行结束和下一次执行开始之间每隔 n 秒而不是 n 秒进行一次重复调用,并且您不希望调用重叠及时,以下更简单:
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
还有一个使用它来 运行 在后台执行几个任务的示例:
async def f():
await asyncio.sleep(1)
print('Hello')
async def g():
await asyncio.sleep(0.5)
print('Goodbye')
async def main():
t1 = asyncio.ensure_future(repeat(3, f))
t2 = asyncio.ensure_future(repeat(2, g))
await t1
await t2
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这就是我用 asyncio 测试我的周期性回调理论所做的。我没有使用 Tornado 的经验,所以我不确定定期回调如何与它一起工作。不过,我习惯于在 Tkinter 中使用 after(ms, callback)
方法,这就是我想出的方法。 While True:
即使它是异步的(比全局更严重),我看起来也很难看。 call_later(s, callback, *args)
方法使用秒而不是毫秒。
import asyncio
my_var = 0
def update_forever(the_loop):
global my_var
print(my_var)
my_var += 1
# exit logic could be placed here
the_loop.call_later(3, update_forever, the_loop) # the method adds a delayed callback on completion
event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()
此解决方案使用来自 Fernando José Esteves de Souza, the drifting workaround from Wojciech Migda 的装饰概念和超级class 以尽可能生成最优雅的代码来处理异步周期函数。
没有threading.Thread
解决方案由以下文件组成:
periodic_async_thread.py
以 class 为底子 classa_periodic_thread.py
示例 subclassrun_me.py
示例实例化和 运行
文件中的PeriodicAsyncThread
class periodic_async_thread.py
:
import time
import asyncio
import abc
class PeriodicAsyncThread:
def __init__(self, period):
self.period = period
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
@abc.abstractmethod
async def run(self, *args, **kwargs):
return
def start(self):
asyncio.run(self.run())
一个简单的子示例class APeriodicThread
文件 a_periodic_thread.py
:
from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio
class APeriodicThread(PeriodicAsyncThread):
def __init__(self, period):
super().__init__(period)
self.run = self.periodic()(self.run)
async def run(self, *args, **kwargs):
await asyncio.sleep(2)
print(time.time())
在文件 run_me.py
:
from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()
此代码代表了一种优雅的解决方案,它还减轻了其他解决方案的时间漂移问题。输出类似于:
1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736
与threading.Thread
解决方案由以下文件组成:
async_thread.py
与 canopy 异步线程 class.periodic_async_thread.py
以 class 为底子 classa_periodic_thread.py
示例 subclassrun_me.py
示例实例化和 运行
文件async_thread.py
中的AsyncThread
class:
from threading import Thread
import asyncio
import abc
class AsyncThread(Thread):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@abc.abstractmethod
async def async_run(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(self.async_run(*args, **kwargs))
# loop.close()
asyncio.run(self.async_run(*args, **kwargs))
文件中的PeriodicAsyncThread
class periodic_async_thread.py
:
import time
import asyncio
from .async_thread import AsyncThread
class PeriodicAsyncThread(AsyncThread):
def __init__(self, period, *args, **kwargs):
self.period = period
super().__init__(*args, **kwargs)
self.async_run = self.periodic()(self.async_run)
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
一个简单的子示例class APeriodicThread
文件 a_periodic_thread.py
:
import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio
class APeriodicAsyncTHread(PeriodicAsyncThread):
async def async_run(self, *args, **kwargs):
print(f"{current_thread().name} {time.time()} Hi!")
await asyncio.sleep(1)
print(f"{current_thread().name} {time.time()} Bye!")
在文件 run_me.py
:
from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()
此代码代表了一种优雅的解决方案,它还减轻了其他解决方案的时间漂移问题。输出类似于:
a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!
对于多种类型的调度,我推荐 APSScheduler,它支持异步。
我将它用于一个简单的 python 进程,我可以使用 docker 启动它,就像一个 cron 一样每周执行一些事情,直到我杀死 docker/process.