有什么办法可以保证asyncio.Task立即启动?

Is there any way to guarantee asyncio.Task will be started immediately?

import asyncio


l = asyncio.Lock()

async def test():
    print('locked' if l.locked() else 'unlocked')

    await l.acquire()
    # await asyncio.ensure_future(l.acquire())

    await asyncio.sleep(1)
    l.release()

async def main():
    await asyncio.gather(test(), test())

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

我们启动两个 test() 协程,第一个立即通过 l.acquire() 锁定 Lock,第二个打印 locked 状态。输出:

unlocked
locked

如果您注释 await l.acquire() 行并取消注释下一行,一切都会改变。输出将是:

unlocked
unlocked

这是因为包裹在任务中的 l.acquire() 在第二个 test() 启动后启动。

有什么方法可以使 l.acquire() 任务在第二个 test() 之前尽快启动(并获得与原始代码相同的输出)?

看来我找到了解决方案。我们需要在第一个任务启动时暂停第二个任务锁定检查的全局锁:

import asyncio


l = asyncio.Lock()
check_lock = asyncio.Lock()

async def test():
    async with check_lock:
        print('locked' if l.locked() else 'unlocked')
        await asyncio.ensure_future(l.acquire())
        print('now', 'locked' if l.locked() else 'unlocked')

    await asyncio.sleep(1)
    l.release()

async def main():
    await asyncio.gather(test(), test())

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

unlocked
now locked
locked
# 1 second delay here
now locked

但是这里我们遇到了另一个问题:当第一个任务完成时第二个任务会被挂起,这就是为什么我们在第二次锁定之前有 1 秒的延迟。在原始示例中很难看到(因为 test() 等待单锁),这就是我添加第二次打印的原因。但是 立即启动协程可能很重要:我们应该只锁定创建任务,而不是等待它。创建任务本身不会立即启动该任务(和 l.acquire()),我们应该 return 控制事件循环。可以通过await asyncio.sleep(0)来完成。这是原始代码的最终解决方案:

import asyncio


l = asyncio.Lock()
check_lock = asyncio.Lock()

async def test():
    async with check_lock:
        print('locked' if l.locked() else 'unlocked')
        task = asyncio.ensure_future(l.acquire())  # only create task
        await asyncio.sleep(0)  # return control to event loop, it allows lock to be locked before task completed
        print('now', 'locked' if l.locked() else 'unlocked')  # would be printed immediately

    await task  # and now we can await for task done
    await asyncio.sleep(1)
    l.release()

async def main():
    await asyncio.gather(test(), test())

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

unlocked
now locked
locked
# no delay here
now locked