是否可以在异步 python 中挂起和重启任务?

Is it possible to suspend and restart tasks in async python?

这个问题应该很简单,但我找不到任何相关信息。

我有一个异步 python 程序,其中包含一个相当长的 运行 任务,我希望能够在任意点暂停和重新启动该任务(当然是任意的,意思是在任何有 await 关键字的地方). 我希望有一些类似于 task.suspend()task.resume() 的东西,但似乎没有。 在任务或事件循环级别上是否有任何 API 或我需要以某种方式自己执行此操作?我不想在每次等待之前放置一个 event.wait()...

谢谢

您所要求的是可能的,但并非微不足道。首先,请注意,您永远不能在 every await 上挂起,而只能在导致协程挂起的那些上挂起,例如 asyncio.sleep(),或 stream.read() 没有数据准备好 return。等待一个协程会立即开始执行它,如果协程可以 return 立即执行,它就不会进入事件循环。 await 仅在被等待者(或 its 被等待者等)请求时才挂起到事件循环。这些问题的更多细节:, , , .

考虑到这一点,您可以使用 中的技术来拦截协程的每次恢复,并使用额外的代码检查任务是否已暂停,如果是,则在继续之前等待恢复事件.

import asyncio

class Suspendable:
    def __init__(self, target):
        self._target = target
        self._can_run = asyncio.Event()
        self._can_run.set()
        self._task = asyncio.ensure_future(self)

    def __await__(self):
        target_iter = self._target.__await__()
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        # This "while" emulates yield from.
        while True:
            # wait for can_run before resuming execution of self._target
            try:
                while not self._can_run.is_set():
                    yield from self._can_run.wait().__await__()
            except BaseException as err:
                send, message = iter_throw, err

            # continue with our regular program
            try:
                signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err

    def suspend(self):
        self._can_run.clear()

    def is_suspended(self):
        return not self._can_run.is_set()

    def resume(self):
        self._can_run.set()

    def get_task(self):
        return self._task

测试:

import time

async def heartbeat():
    while True:
        print(time.time())
        await asyncio.sleep(.2)

async def main():
    task = Suspendable(heartbeat())
    for i in range(5):
        print('suspending')
        task.suspend()
        await asyncio.sleep(1)
        print('resuming')
        task.resume()
        await asyncio.sleep(1)

asyncio.run(main())