试图了解 Tasks 如何处理阻塞调用
Trying to understand how Tasks works with blocking calls
我最近开始研究 asyncio
库,目标是用异步替换基于大线程的应用程序。
阅读 asyncio
documentantion 我偶然发现了一个例子
create_task
正在使用中。因为我现在坚持使用 python 3.6,所以我更改了 create_task
调用
ensure_future
,生成当前代码:
# Python 3.6
import asyncio
import time
async def say_after(delay, what):
print(f"start {what}") # Added for better vizualization of what is happening
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.ensure_future(
say_after(1, 'hello'))
task2 = asyncio.ensure_future(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()
并输出:
started at 15:23:11
start hello
start world
hello
world
finished at 15:23:13
据我所知,事件循环:
- 首先开始任务
task1
;
- 点击
asyncio.sleep
后,它会将上下文更改为第二个 task2
;和
- 当第一个睡眠结束时,它变为
task1
,当 task2
的睡眠调用结束时同样发生同样的事情。
综上所述,我的应用程序的要求之一是我们有一些需要转换为协程的阻塞调用。
我创建了这个模拟代码来测试 run_in_executor
函数,目标是:
# Python 3.6
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def normal_operations():
print("start blocking")
time.sleep(1)
print("ended blocking")
async def async_operation():
print("start non blocking")
await asyncio.sleep(2)
print("ended non blocking")
async def main():
loop = asyncio.get_event_loop()
print(f"started at {time.strftime('%X')}")
with ThreadPoolExecutor() as pool:
task1 = asyncio.ensure_future(
loop.run_in_executor(pool, normal_operations)
)
task2 = asyncio.ensure_future(
async_operation()
)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()
我预计输出会与第一个示例类似,但是当我 运行 此代码时,输出是:
started at 15:28:06
start blocking
ended blocking
start non blocking
ended non blocking
finished at 15:28:09
这两个函数 运行 按顺序排列,不像第一个例子那样,开始 print
调用一个接一个地调用。
我不确定我做错了什么,我的猜测是 run_in_executor
函数并没有真正创建一个异步调用,或者也许我只是错误地实现了它,我不知道。
好的,我想我弄错了。
我正在等待 ThreadPoolExecutor
之外的非阻塞操作,在 __exit__
dunder 中正在使用 wait=True
参数调用 shutdown
函数,所以基本上执行者阻止了我的代码。
固定码:
async def main():
loop = asyncio.get_event_loop()
print(f"started at {time.strftime('%X')}")
pool = ThreadPoolExecutor()
task1 = asyncio.ensure_future(loop.run_in_executor(pool, normal_operations))
task2 = asyncio.ensure_future(async_operation())
await task1
await task2
print(f"finished at {time.strftime('%X')}")
pool.shutdown(wait=True)
预期输出:
started at 16:20:48
start non blocking
start blocking
ended blocking
ended non blocking
finished at 16:20:50
我最近开始研究 asyncio
库,目标是用异步替换基于大线程的应用程序。
阅读 asyncio
documentantion 我偶然发现了一个例子
create_task
正在使用中。因为我现在坚持使用 python 3.6,所以我更改了 create_task
调用
ensure_future
,生成当前代码:
# Python 3.6
import asyncio
import time
async def say_after(delay, what):
print(f"start {what}") # Added for better vizualization of what is happening
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.ensure_future(
say_after(1, 'hello'))
task2 = asyncio.ensure_future(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()
并输出:
started at 15:23:11
start hello
start world
hello
world
finished at 15:23:13
据我所知,事件循环:
- 首先开始任务
task1
; - 点击
asyncio.sleep
后,它会将上下文更改为第二个task2
;和 - 当第一个睡眠结束时,它变为
task1
,当task2
的睡眠调用结束时同样发生同样的事情。
综上所述,我的应用程序的要求之一是我们有一些需要转换为协程的阻塞调用。
我创建了这个模拟代码来测试 run_in_executor
函数,目标是:
# Python 3.6
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def normal_operations():
print("start blocking")
time.sleep(1)
print("ended blocking")
async def async_operation():
print("start non blocking")
await asyncio.sleep(2)
print("ended non blocking")
async def main():
loop = asyncio.get_event_loop()
print(f"started at {time.strftime('%X')}")
with ThreadPoolExecutor() as pool:
task1 = asyncio.ensure_future(
loop.run_in_executor(pool, normal_operations)
)
task2 = asyncio.ensure_future(
async_operation()
)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()
我预计输出会与第一个示例类似,但是当我 运行 此代码时,输出是:
started at 15:28:06
start blocking
ended blocking
start non blocking
ended non blocking
finished at 15:28:09
这两个函数 运行 按顺序排列,不像第一个例子那样,开始 print
调用一个接一个地调用。
我不确定我做错了什么,我的猜测是 run_in_executor
函数并没有真正创建一个异步调用,或者也许我只是错误地实现了它,我不知道。
好的,我想我弄错了。
我正在等待 ThreadPoolExecutor
之外的非阻塞操作,在 __exit__
dunder 中正在使用 wait=True
参数调用 shutdown
函数,所以基本上执行者阻止了我的代码。
固定码:
async def main():
loop = asyncio.get_event_loop()
print(f"started at {time.strftime('%X')}")
pool = ThreadPoolExecutor()
task1 = asyncio.ensure_future(loop.run_in_executor(pool, normal_operations))
task2 = asyncio.ensure_future(async_operation())
await task1
await task2
print(f"finished at {time.strftime('%X')}")
pool.shutdown(wait=True)
预期输出:
started at 16:20:48
start non blocking
start blocking
ended blocking
ended non blocking
finished at 16:20:50