Python 3.5 async for 块 ioloop
Python 3.5 async for blocks the ioloop
我有一个带有两个处理程序的简单 aiohttp-server。
第一个在 async for
循环中进行一些计算。第二个只是 returns 文本响应。 not_so_long_operation
returns 递归实现最慢的第 30 个斐波那契数,大约需要一秒钟。
def not_so_long_operation():
return fib(30)
class arange:
def __init__(self, n):
self.n = n
self.i = 0
async def __aiter__(self):
return self
async def __anext__(self):
i = self.i
self.i += 1
if self.i <= self.n:
return i
else:
raise StopAsyncIteration
# GET /
async def index(request):
print('request!')
l = []
async for i in arange(20):
print(i)
l.append(not_so_long_operation())
return aiohttp.web.Response(text='%d\n' % l[0])
# GET /lol/
async def lol(request):
print('request!')
return aiohttp.web.Response(text='just respond\n')
当我尝试获取 /
然后 /lol/
时,它仅在第一个完成时才对第二个进行响应。
我做错了什么以及如何让索引处理程序在每次迭代时释放 ioloop?
这里真的不需要异步迭代器。相反,您可以简单地将控制权交还给循环内的事件循环。在 python 3.4 中,这是通过使用简单的 yield
:
@asyncio.coroutine
def index(self):
for i in range(20):
not_so_long_operation()
yield
在 python 3.5 中,您可以定义一个 Empty
基本上做同样事情的对象:
class Empty:
def __await__(self):
yield
然后使用 await
语法:
async def index(request):
for i in range(20):
not_so_long_operation()
await Empty()
或者直接使用asyncio.sleep(0) that has been recently optimized:
async def index(request):
for i in range(20):
not_so_long_operation()
await asyncio.sleep(0)
您还可以 运行 在线程中使用 default executor:
not_so_long_operation
async def index(request, loop):
for i in range(20):
await loop.run_in_executor(None, not_so_long_operation)
由于 fib(30)
是 CPU 绑定并共享少量数据,您可能应该使用 ProcessPoolExecutor
(而不是 ThreadPoolExecutor
):
async def index(request):
loop = request.app.loop
executor = request.app["executor"]
result = await loop.run_in_executor(executor, fib, 30)
return web.Response(text="%d" % result)
在创建 app
时设置 executor
:
app = Application(...)
app["exector"] = ProcessPoolExector()
您的示例没有用于在任务之间切换的 屈服点 (await
语句)。
异步迭代器 允许 在 __aiter__
/__anext__
中使用 await
但不要将其自动插入到您的代码中。
说,
class arange:
def __init__(self, n):
self.n = n
self.i = 0
async def __aiter__(self):
return self
async def __anext__(self):
i = self.i
self.i += 1
if self.i <= self.n:
await asyncio.sleep(0) # insert yield point
return i
else:
raise StopAsyncIteration
应该会按预期工作。
在实际应用中,您很可能不需要 await asyncio.sleep(0)
调用,因为您将等待数据库访问和类似活动。
我有一个带有两个处理程序的简单 aiohttp-server。
第一个在 async for
循环中进行一些计算。第二个只是 returns 文本响应。 not_so_long_operation
returns 递归实现最慢的第 30 个斐波那契数,大约需要一秒钟。
def not_so_long_operation():
return fib(30)
class arange:
def __init__(self, n):
self.n = n
self.i = 0
async def __aiter__(self):
return self
async def __anext__(self):
i = self.i
self.i += 1
if self.i <= self.n:
return i
else:
raise StopAsyncIteration
# GET /
async def index(request):
print('request!')
l = []
async for i in arange(20):
print(i)
l.append(not_so_long_operation())
return aiohttp.web.Response(text='%d\n' % l[0])
# GET /lol/
async def lol(request):
print('request!')
return aiohttp.web.Response(text='just respond\n')
当我尝试获取 /
然后 /lol/
时,它仅在第一个完成时才对第二个进行响应。
我做错了什么以及如何让索引处理程序在每次迭代时释放 ioloop?
这里真的不需要异步迭代器。相反,您可以简单地将控制权交还给循环内的事件循环。在 python 3.4 中,这是通过使用简单的 yield
:
@asyncio.coroutine
def index(self):
for i in range(20):
not_so_long_operation()
yield
在 python 3.5 中,您可以定义一个 Empty
基本上做同样事情的对象:
class Empty:
def __await__(self):
yield
然后使用 await
语法:
async def index(request):
for i in range(20):
not_so_long_operation()
await Empty()
或者直接使用asyncio.sleep(0) that has been recently optimized:
async def index(request):
for i in range(20):
not_so_long_operation()
await asyncio.sleep(0)
您还可以 运行 在线程中使用 default executor:
not_so_long_operation
async def index(request, loop):
for i in range(20):
await loop.run_in_executor(None, not_so_long_operation)
由于 fib(30)
是 CPU 绑定并共享少量数据,您可能应该使用 ProcessPoolExecutor
(而不是 ThreadPoolExecutor
):
async def index(request):
loop = request.app.loop
executor = request.app["executor"]
result = await loop.run_in_executor(executor, fib, 30)
return web.Response(text="%d" % result)
在创建 app
时设置 executor
:
app = Application(...)
app["exector"] = ProcessPoolExector()
您的示例没有用于在任务之间切换的 屈服点 (await
语句)。
异步迭代器 允许 在 __aiter__
/__anext__
中使用 await
但不要将其自动插入到您的代码中。
说,
class arange:
def __init__(self, n):
self.n = n
self.i = 0
async def __aiter__(self):
return self
async def __anext__(self):
i = self.i
self.i += 1
if self.i <= self.n:
await asyncio.sleep(0) # insert yield point
return i
else:
raise StopAsyncIteration
应该会按预期工作。
在实际应用中,您很可能不需要 await asyncio.sleep(0)
调用,因为您将等待数据库访问和类似活动。