试图了解 Tasks 如何处理阻塞调用

Trying to understand how Tasks works with blocking calls

我最近开始研究 asyncio 库,目标是用异步替换基于大线程的应用程序。

阅读 asyncio documentantion 我偶然发现了一个例子 create_task 正在使用中。因为我现在坚持使用 python 3.6,所以我更改了 create_task 调用 ensure_future,生成当前代码:

# Python 3.6
import asyncio
import time


async def say_after(delay, what):
    print(f"start {what}")  # Added for better vizualization of what is happening
    await asyncio.sleep(delay)
    print(what)


async def main():
    task1 = asyncio.ensure_future(
        say_after(1, 'hello'))

    task2 = asyncio.ensure_future(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
    loop.close()

并输出:

started at 15:23:11
start hello
start world
hello
world
finished at 15:23:13

据我所知,事件循环:

综上所述,我的应用程序的要求之一是我们有一些需要转换为协程的阻塞调用。 我创建了这个模拟代码来测试 run_in_executor 函数,目标是:

# Python 3.6
import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def normal_operations():
    print("start blocking")
    time.sleep(1)
    print("ended blocking")


async def async_operation():
    print("start non blocking")
    await asyncio.sleep(2)
    print("ended non blocking")


async def main():
    loop = asyncio.get_event_loop()

    print(f"started at {time.strftime('%X')}")

    with ThreadPoolExecutor() as pool:
        task1 = asyncio.ensure_future(
            loop.run_in_executor(pool, normal_operations)
        )

    task2 = asyncio.ensure_future(
        async_operation()
    )

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
    loop.close()

我预计输出会与第一个示例类似,但是当我 运行 此代码时,输​​出是:

started at 15:28:06
start blocking
ended blocking
start non blocking
ended non blocking
finished at 15:28:09

这两个函数 运行 按顺序排列,不像第一个例子那样,开始 print 调用一个接一个地调用。

我不确定我做错了什么,我的猜测是 run_in_executor 函数并没有真正创建一个异步调用,或者也许我只是错误地实现了它,我不知道。

好的,我想我弄错了。

我正在等待 ThreadPoolExecutor 之外的非阻塞操作,在 __exit__ dunder 中正在使用 wait=True 参数调用 shutdown 函数,所以基本上执行者阻止了我的代码。

固定码:

async def main():
    loop = asyncio.get_event_loop()

    print(f"started at {time.strftime('%X')}")

    pool = ThreadPoolExecutor()

    task1 = asyncio.ensure_future(loop.run_in_executor(pool, normal_operations))
    task2 = asyncio.ensure_future(async_operation())

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

    pool.shutdown(wait=True)

预期输出:

started at 16:20:48
start non blocking
start blocking
ended blocking
ended non blocking
finished at 16:20:50