一系列阻塞调用的并发未来轮询
Concurrent future polling of series of blocking calls
我正在尝试为 Python 中的一个长 运行ning 任务生成一个轮询机制。为此,我使用并发 Future 并使用 .done()
进行轮询。该任务存在许多本身阻塞的迭代,我将其包装在一个异步函数中。我调用第三方软件时无权访问阻塞函数的代码。这是我当前方法的一个最小示例:
import asyncio
import time
async def blocking_iteration():
time.sleep(1)
async def long_running():
for i in range(5):
print(f"sleeping {i}")
await blocking_iteration()
async def poll_run():
future = asyncio.ensure_future(long_running())
while not future.done():
print("before polling")
await asyncio.sleep(0.05)
print("polling")
future.result()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(poll_run())
loop.close()
结果是:
before polling
sleeping 0
sleeping 1
sleeping 2
sleeping 3
sleeping 4
polling
根据我目前对 Python 中异步机制的理解,我曾预计循环会在第一次睡眠后解除阻塞,return 控制将返回到 [=33] 的循环=] await
语句并且只会 运行 在后续轮询之后 long_running 函数的第二次迭代。
所以期望的输出是这样的:
before polling
sleeping 0
polling
before polling
sleeping 1
polling
before polling
sleeping 2
polling
before polling
sleeping 3
polling
before polling
sleeping 4
polling
这可以通过当前方法以某种方式实现,还是可以通过其他方式实现?
编辑
感谢@drjackild 能够通过更改解决它
async def blocking_iteration():
time.sleep(1)
进入
def blocking():
time.sleep(1)
async def blocking_iteration():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, blocking)
time
是同步库,执行时会阻塞整个主线程。如果你的程序中有这样的阻塞调用,你可以避免阻塞线程或进程池执行器(你可以阅读它 here)。或者,将 blocking_iteration
更改为使用 asyncio.sleep
而不是 time.sleep
更新。为了清楚起见,这里是非阻塞版本,它使用 loop.run_in_executor
和默认执行程序。请注意,blocking_iteration
现在没有 async
import asyncio
import concurrent.futures
import time
def blocking_iteration():
time.sleep(1)
async def long_running():
loop = asyncio.get_event_loop()
for i in range(5):
print(f"sleeping {i}")
await loop.run_in_executor(None, blocking_iteration)
async def poll_run():
task = asyncio.create_task(long_running())
while not task.done():
print("before polling")
await asyncio.sleep(0.05)
print("polling")
print(task.result())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(poll_run())
loop.close()
我正在尝试为 Python 中的一个长 运行ning 任务生成一个轮询机制。为此,我使用并发 Future 并使用 .done()
进行轮询。该任务存在许多本身阻塞的迭代,我将其包装在一个异步函数中。我调用第三方软件时无权访问阻塞函数的代码。这是我当前方法的一个最小示例:
import asyncio
import time
async def blocking_iteration():
time.sleep(1)
async def long_running():
for i in range(5):
print(f"sleeping {i}")
await blocking_iteration()
async def poll_run():
future = asyncio.ensure_future(long_running())
while not future.done():
print("before polling")
await asyncio.sleep(0.05)
print("polling")
future.result()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(poll_run())
loop.close()
结果是:
before polling
sleeping 0
sleeping 1
sleeping 2
sleeping 3
sleeping 4
polling
根据我目前对 Python 中异步机制的理解,我曾预计循环会在第一次睡眠后解除阻塞,return 控制将返回到 [=33] 的循环=] await
语句并且只会 运行 在后续轮询之后 long_running 函数的第二次迭代。
所以期望的输出是这样的:
before polling
sleeping 0
polling
before polling
sleeping 1
polling
before polling
sleeping 2
polling
before polling
sleeping 3
polling
before polling
sleeping 4
polling
这可以通过当前方法以某种方式实现,还是可以通过其他方式实现?
编辑
感谢@drjackild 能够通过更改解决它
async def blocking_iteration():
time.sleep(1)
进入
def blocking():
time.sleep(1)
async def blocking_iteration():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, blocking)
time
是同步库,执行时会阻塞整个主线程。如果你的程序中有这样的阻塞调用,你可以避免阻塞线程或进程池执行器(你可以阅读它 here)。或者,将 blocking_iteration
更改为使用 asyncio.sleep
而不是 time.sleep
更新。为了清楚起见,这里是非阻塞版本,它使用 loop.run_in_executor
和默认执行程序。请注意,blocking_iteration
现在没有 async
import asyncio
import concurrent.futures
import time
def blocking_iteration():
time.sleep(1)
async def long_running():
loop = asyncio.get_event_loop()
for i in range(5):
print(f"sleeping {i}")
await loop.run_in_executor(None, blocking_iteration)
async def poll_run():
task = asyncio.create_task(long_running())
while not task.done():
print("before polling")
await asyncio.sleep(0.05)
print("polling")
print(task.result())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(poll_run())
loop.close()