一系列阻塞调用的并发未来轮询

Concurrent future polling of series of blocking calls

我正在尝试为 Python 中的一个长 运行ning 任务生成一个轮询机制。为此,我使用并发 Future 并使用 .done() 进行轮询。该任务存在许多本身阻塞的迭代,我将其包装在一个异步函数中。我调用第三方软件时无权访问阻塞函数的代码。这是我当前方法的一个最小示例:

import asyncio
import time

async def blocking_iteration():
    time.sleep(1)

async def long_running():
    for i in range(5):
        print(f"sleeping {i}")
        await blocking_iteration()

async def poll_run():
    future = asyncio.ensure_future(long_running())
    while not future.done():
        print("before polling")
        await asyncio.sleep(0.05)
        print("polling")
    future.result()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(poll_run())
    loop.close()

结果是:

before polling
sleeping 0
sleeping 1
sleeping 2
sleeping 3
sleeping 4
polling

根据我目前对 Python 中异步机制的理解,我曾预计循环会在第一次睡眠后解除阻塞,return 控制将返回到 [=33] 的循环=] await 语句并且只会 运行 在后续轮询之后 long_running 函数的第二次迭代。 所以期望的输出是这样的:

before polling
sleeping 0
polling
before polling
sleeping 1
polling
before polling
sleeping 2
polling
before polling
sleeping 3
polling
before polling
sleeping 4
polling

这可以通过当前方法以某种方式实现,还是可以通过其他方式实现?

编辑

感谢@drjackild 能够通过更改解决它

async def blocking_iteration():
    time.sleep(1)

进入

def blocking():
    time.sleep(1)

async def blocking_iteration():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, blocking)

time 是同步库,执行时会阻塞整个主线程。如果你的程序中有这样的阻塞调用,你可以避免阻塞线程或进程池执行器(你可以阅读它 here)。或者,将 blocking_iteration 更改为使用 asyncio.sleep 而不是 time.sleep

更新。为了清楚起见,这里是非阻塞版本,它使用 loop.run_in_executor 和默认执行程序。请注意,blocking_iteration 现在没有 async

import asyncio
import concurrent.futures
import time

def blocking_iteration():
    time.sleep(1)

async def long_running():
    loop = asyncio.get_event_loop()
    for i in range(5):
        print(f"sleeping {i}")
        await loop.run_in_executor(None, blocking_iteration)

async def poll_run():
    task = asyncio.create_task(long_running())
    while not task.done():
        print("before polling")
        await asyncio.sleep(0.05)
        print("polling")
    print(task.result())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(poll_run())
    loop.close()