如何使用 asyncio 安排和取消任务
How to schedule and cancel tasks with asyncio
我正在编写客户端-服务器应用程序。连接时,客户端向服务器发送一个 "heartbeat" 信号,例如每秒发送一次。
在服务器端,我需要一种机制,我可以在其中添加要异步执行的任务(或协程或其他东西)。此外,当客户端停止发送 "heartbeat" 信号时,我想取消客户端的任务。
换句话说,当服务器启动一个任务时,它有一种超时或 ttl,例如 3 秒。当服务器收到 "heartbeat" 信号时,它会将计时器再重置 3 秒,直到任务完成或客户端断开连接(停止发送信号)。
这是 example 从 pymotw.com 上的 asyncio 教程取消任务。但是这里任务在event_loop开始前就取消了,不适合我
import asyncio
async def task_func():
print('in task_func')
return 'the result'
event_loop = asyncio.get_event_loop()
try:
print('creating task')
task = event_loop.create_task(task_func())
print('canceling task')
task.cancel()
print('entering event loop')
event_loop.run_until_complete(task)
print('task: {!r}'.format(task))
except asyncio.CancelledError:
print('caught error from cancelled task')
else:
print('task result: {!r}'.format(task.result()))
finally:
event_loop.close()
您可以使用 asyncio
Task
包装器通过 ensure_future()
方法执行任务。
ensure_future
会自动将协程包装在 Task
包装器中,并将其附加到事件循环中。然后 Task
包装器还将确保协程 'cranks-over' 从 await
到 await
语句(或直到协程完成)。
换句话说,只需将常规协程传递给 ensure_future
并将生成的 Task
对象分配给变量即可。然后,您可以在需要停止时调用 Task.cancel()
。
import asyncio
async def task_func():
print('in task_func')
# if the task needs to run for a while you'll need an await statement
# to provide a pause point so that other coroutines can run in the mean time
await some_db_or_long_running_background_coroutine()
# or if this is a once-off thing, then return the result,
# but then you don't really need a Task wrapper...
# return 'the result'
async def my_app():
my_task = None
while True:
await asyncio.sleep(0)
# listen for trigger / heartbeat
if heartbeat and my_task is None:
my_task = asyncio.ensure_future(task_func())
# also listen for termination of hearbeat / connection
elif not heartbeat and my_task:
if not my_task.cancelled():
my_task.cancel()
else:
my_task = None
run_app = asyncio.ensure_future(my_app())
event_loop = asyncio.get_event_loop()
event_loop.run_forever()
请注意,任务适用于长时间 运行 需要在不中断主流程的情况下在后台继续工作的任务。如果您只需要一个快速的一次性方法,那么直接调用该函数即可。
我正在编写客户端-服务器应用程序。连接时,客户端向服务器发送一个 "heartbeat" 信号,例如每秒发送一次。 在服务器端,我需要一种机制,我可以在其中添加要异步执行的任务(或协程或其他东西)。此外,当客户端停止发送 "heartbeat" 信号时,我想取消客户端的任务。
换句话说,当服务器启动一个任务时,它有一种超时或 ttl,例如 3 秒。当服务器收到 "heartbeat" 信号时,它会将计时器再重置 3 秒,直到任务完成或客户端断开连接(停止发送信号)。
这是 example 从 pymotw.com 上的 asyncio 教程取消任务。但是这里任务在event_loop开始前就取消了,不适合我
import asyncio
async def task_func():
print('in task_func')
return 'the result'
event_loop = asyncio.get_event_loop()
try:
print('creating task')
task = event_loop.create_task(task_func())
print('canceling task')
task.cancel()
print('entering event loop')
event_loop.run_until_complete(task)
print('task: {!r}'.format(task))
except asyncio.CancelledError:
print('caught error from cancelled task')
else:
print('task result: {!r}'.format(task.result()))
finally:
event_loop.close()
您可以使用 asyncio
Task
包装器通过 ensure_future()
方法执行任务。
ensure_future
会自动将协程包装在 Task
包装器中,并将其附加到事件循环中。然后 Task
包装器还将确保协程 'cranks-over' 从 await
到 await
语句(或直到协程完成)。
换句话说,只需将常规协程传递给 ensure_future
并将生成的 Task
对象分配给变量即可。然后,您可以在需要停止时调用 Task.cancel()
。
import asyncio
async def task_func():
print('in task_func')
# if the task needs to run for a while you'll need an await statement
# to provide a pause point so that other coroutines can run in the mean time
await some_db_or_long_running_background_coroutine()
# or if this is a once-off thing, then return the result,
# but then you don't really need a Task wrapper...
# return 'the result'
async def my_app():
my_task = None
while True:
await asyncio.sleep(0)
# listen for trigger / heartbeat
if heartbeat and my_task is None:
my_task = asyncio.ensure_future(task_func())
# also listen for termination of hearbeat / connection
elif not heartbeat and my_task:
if not my_task.cancelled():
my_task.cancel()
else:
my_task = None
run_app = asyncio.ensure_future(my_app())
event_loop = asyncio.get_event_loop()
event_loop.run_forever()
请注意,任务适用于长时间 运行 需要在不中断主流程的情况下在后台继续工作的任务。如果您只需要一个快速的一次性方法,那么直接调用该函数即可。