是否可以在异步 python 中挂起和重启任务?
Is it possible to suspend and restart tasks in async python?
这个问题应该很简单,但我找不到任何相关信息。
我有一个异步 python 程序,其中包含一个相当长的 运行 任务,我希望能够在任意点暂停和重新启动该任务(当然是任意的,意思是在任何有 await 关键字的地方).
我希望有一些类似于 task.suspend()
和 task.resume()
的东西,但似乎没有。
在任务或事件循环级别上是否有任何 API 或我需要以某种方式自己执行此操作?我不想在每次等待之前放置一个 event.wait()
...
谢谢
您所要求的是可能的,但并非微不足道。首先,请注意,您永远不能在 every await
上挂起,而只能在导致协程挂起的那些上挂起,例如 asyncio.sleep()
,或 stream.read()
没有数据准备好 return。等待一个协程会立即开始执行它,如果协程可以 return 立即执行,它就不会进入事件循环。 await
仅在被等待者(或 its 被等待者等)请求时才挂起到事件循环。这些问题的更多细节:, , , .
考虑到这一点,您可以使用 中的技术来拦截协程的每次恢复,并使用额外的代码检查任务是否已暂停,如果是,则在继续之前等待恢复事件.
import asyncio
class Suspendable:
def __init__(self, target):
self._target = target
self._can_run = asyncio.Event()
self._can_run.set()
self._task = asyncio.ensure_future(self)
def __await__(self):
target_iter = self._target.__await__()
iter_send, iter_throw = target_iter.send, target_iter.throw
send, message = iter_send, None
# This "while" emulates yield from.
while True:
# wait for can_run before resuming execution of self._target
try:
while not self._can_run.is_set():
yield from self._can_run.wait().__await__()
except BaseException as err:
send, message = iter_throw, err
# continue with our regular program
try:
signal = send(message)
except StopIteration as err:
return err.value
else:
send = iter_send
try:
message = yield signal
except BaseException as err:
send, message = iter_throw, err
def suspend(self):
self._can_run.clear()
def is_suspended(self):
return not self._can_run.is_set()
def resume(self):
self._can_run.set()
def get_task(self):
return self._task
测试:
import time
async def heartbeat():
while True:
print(time.time())
await asyncio.sleep(.2)
async def main():
task = Suspendable(heartbeat())
for i in range(5):
print('suspending')
task.suspend()
await asyncio.sleep(1)
print('resuming')
task.resume()
await asyncio.sleep(1)
asyncio.run(main())
这个问题应该很简单,但我找不到任何相关信息。
我有一个异步 python 程序,其中包含一个相当长的 运行 任务,我希望能够在任意点暂停和重新启动该任务(当然是任意的,意思是在任何有 await 关键字的地方).
我希望有一些类似于 task.suspend()
和 task.resume()
的东西,但似乎没有。
在任务或事件循环级别上是否有任何 API 或我需要以某种方式自己执行此操作?我不想在每次等待之前放置一个 event.wait()
...
谢谢
您所要求的是可能的,但并非微不足道。首先,请注意,您永远不能在 every await
上挂起,而只能在导致协程挂起的那些上挂起,例如 asyncio.sleep()
,或 stream.read()
没有数据准备好 return。等待一个协程会立即开始执行它,如果协程可以 return 立即执行,它就不会进入事件循环。 await
仅在被等待者(或 its 被等待者等)请求时才挂起到事件循环。这些问题的更多细节:
考虑到这一点,您可以使用
import asyncio
class Suspendable:
def __init__(self, target):
self._target = target
self._can_run = asyncio.Event()
self._can_run.set()
self._task = asyncio.ensure_future(self)
def __await__(self):
target_iter = self._target.__await__()
iter_send, iter_throw = target_iter.send, target_iter.throw
send, message = iter_send, None
# This "while" emulates yield from.
while True:
# wait for can_run before resuming execution of self._target
try:
while not self._can_run.is_set():
yield from self._can_run.wait().__await__()
except BaseException as err:
send, message = iter_throw, err
# continue with our regular program
try:
signal = send(message)
except StopIteration as err:
return err.value
else:
send = iter_send
try:
message = yield signal
except BaseException as err:
send, message = iter_throw, err
def suspend(self):
self._can_run.clear()
def is_suspended(self):
return not self._can_run.is_set()
def resume(self):
self._can_run.set()
def get_task(self):
return self._task
测试:
import time
async def heartbeat():
while True:
print(time.time())
await asyncio.sleep(.2)
async def main():
task = Suspendable(heartbeat())
for i in range(5):
print('suspending')
task.suspend()
await asyncio.sleep(1)
print('resuming')
task.resume()
await asyncio.sleep(1)
asyncio.run(main())