异步任务意外延迟
asyncio tasks getting unexpectedly defered
我一直在努力学习一些关于 asyncio
的知识,但我遇到了一些意外行为。我设置了一个简单的斐波那契服务器,它支持使用流的多个连接。 fib的计算是递归写的,所以我输入大数就可以模拟long 运行ning的计算。正如预期的那样,long 运行ning 计算会阻塞 I/O,直到 long 运行ning 计算完成。
问题来了。我将斐波那契函数重写为协程。我预计通过从每次递归中产生,控制将回退到事件循环,并且等待 I/O 任务将有机会执行,并且您甚至能够 运行 多个 fib 计算同时。然而,情况似乎并非如此。
代码如下:
import asyncio
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = yield from fib(n)
writer.write('{}\n'.format(result).encode('ascii'))
yield from writer.drain()
writer.close()
print("Closed")
def server(address):
loop = asyncio.get_event_loop()
fib_server = asyncio.start_server(fib_handler, *address, loop=loop)
fib_server = loop.run_until_complete(fib_server)
try:
loop.run_forever()
except KeyboardInterrupt:
print('closing...')
fib_server.close()
loop.run_until_complete(fib_server.wait_closed())
loop.close()
server(('', 25000))
如果您 netcat 到端口 25000 并开始输入数字,则此服务器 运行 非常好。但是,如果您开始一个长 运行ning 计算(例如 35),则在第一个完成之前不会有其他计算 运行。事实上,甚至不会处理额外的连接。
我知道事件循环正在反馈递归 fib
调用的收益,因此控制必须一直下降。但我认为循环会在 "trampolining" 返回 fib 函数之前处理 I/O 队列中的其他调用(例如生成第二个 fib_handler
)。
我确定我一定是误会了什么,或者我忽略了某种错误,但我一辈子都找不到它。
我们将不胜感激您提供的任何见解。
第一个问题是您在 fib_handler
内部调用 yield from fib(n)
。包括 yield from
意味着 fib_handler
将阻塞,直到对 fib(n)
的调用完成,这意味着它无法处理您提供的任何输入,而 fib
是 运行宁。即使您所做的只是 fib
内的 I/O,您也会遇到这个问题。要解决此问题,您应该使用 asyncio.async(fib(n))
(或者最好使用 asyncio.ensure_future(fib(n))
,如果您有足够新的 Python 版本)以使用事件循环安排 fib
,实际上无需阻塞 fib_handler
。从那里,您可以使用 Future.add_done_callback
在准备就绪时将结果写入客户端:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(4)
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = asyncio.async(fib(n))
# Write the result to the client when fib(n) is done.
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")
也就是说,仅此更改仍不能完全解决问题;虽然它将允许多个客户端同时连接和发出命令,但单个客户端仍将获得同步行为。发生这种情况是因为当您直接在协程函数上调用 yield from coro()
时,直到 coro()
(或 coro
调用的另一个协程)实际执行了一些非阻塞 I/O。否则,Python 只会执行 coro
而不会放弃控制权。这是一个有用的性能优化,因为当您的协程实际上不会阻塞 I/O 时将控制权交给事件循环是浪费时间,特别是考虑到 Python 的高函数调用开销。
在你的情况下,fib
永远不会做任何 I/O,所以一旦你在 fib
本身内部调用 yield from fib(n-1)
,事件循环永远不会到达 运行 直到完成递归,这将阻止 fib_handler
从客户端读取任何后续输入,直到完成对 fib
的调用。将对 fib
的调用 全部 包装在 asyncio.async
中可确保每次进行 yield from asyncio.async(fib(...))
调用时都将控制权交给事件循环。当我进行此更改时,除了在 fib_handler
中使用 asyncio.async(fib(n))
之外,我还能够同时处理来自单个客户端的多个输入。这是完整的示例代码:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(4)
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = asyncio.async(fib(n))
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")
Input/Output 在客户端:
dan@dandesk:~$ netcat localhost 25000
35 # This was input
4 # This was input
8 # output
24157817 # output
现在,即使这可行,我也不会使用这个实现,因为它在一个单线程程序中做了一堆 CPU 绑定的工作,该程序也想服务 I/O在同一个线程中。这不会很好地扩展,也不会有理想的性能。相反,我建议在后台进程中使用 loop.run_in_executor
到 运行 对 fib
的调用,这允许异步线程 运行 满负荷运行,也允许我们跨多个内核扩展对 fib
的调用:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
def fib(n):
if n < 1:
return 1
a = fib(n-1)
b = fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(8) # 8 Processes in the pool
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = loop.run_in_executor(executor, fib, n)
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")
我一直在努力学习一些关于 asyncio
的知识,但我遇到了一些意外行为。我设置了一个简单的斐波那契服务器,它支持使用流的多个连接。 fib的计算是递归写的,所以我输入大数就可以模拟long 运行ning的计算。正如预期的那样,long 运行ning 计算会阻塞 I/O,直到 long 运行ning 计算完成。
问题来了。我将斐波那契函数重写为协程。我预计通过从每次递归中产生,控制将回退到事件循环,并且等待 I/O 任务将有机会执行,并且您甚至能够 运行 多个 fib 计算同时。然而,情况似乎并非如此。
代码如下:
import asyncio
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = yield from fib(n)
writer.write('{}\n'.format(result).encode('ascii'))
yield from writer.drain()
writer.close()
print("Closed")
def server(address):
loop = asyncio.get_event_loop()
fib_server = asyncio.start_server(fib_handler, *address, loop=loop)
fib_server = loop.run_until_complete(fib_server)
try:
loop.run_forever()
except KeyboardInterrupt:
print('closing...')
fib_server.close()
loop.run_until_complete(fib_server.wait_closed())
loop.close()
server(('', 25000))
如果您 netcat 到端口 25000 并开始输入数字,则此服务器 运行 非常好。但是,如果您开始一个长 运行ning 计算(例如 35),则在第一个完成之前不会有其他计算 运行。事实上,甚至不会处理额外的连接。
我知道事件循环正在反馈递归 fib
调用的收益,因此控制必须一直下降。但我认为循环会在 "trampolining" 返回 fib 函数之前处理 I/O 队列中的其他调用(例如生成第二个 fib_handler
)。
我确定我一定是误会了什么,或者我忽略了某种错误,但我一辈子都找不到它。
我们将不胜感激您提供的任何见解。
第一个问题是您在 fib_handler
内部调用 yield from fib(n)
。包括 yield from
意味着 fib_handler
将阻塞,直到对 fib(n)
的调用完成,这意味着它无法处理您提供的任何输入,而 fib
是 运行宁。即使您所做的只是 fib
内的 I/O,您也会遇到这个问题。要解决此问题,您应该使用 asyncio.async(fib(n))
(或者最好使用 asyncio.ensure_future(fib(n))
,如果您有足够新的 Python 版本)以使用事件循环安排 fib
,实际上无需阻塞 fib_handler
。从那里,您可以使用 Future.add_done_callback
在准备就绪时将结果写入客户端:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(4)
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = asyncio.async(fib(n))
# Write the result to the client when fib(n) is done.
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")
也就是说,仅此更改仍不能完全解决问题;虽然它将允许多个客户端同时连接和发出命令,但单个客户端仍将获得同步行为。发生这种情况是因为当您直接在协程函数上调用 yield from coro()
时,直到 coro()
(或 coro
调用的另一个协程)实际执行了一些非阻塞 I/O。否则,Python 只会执行 coro
而不会放弃控制权。这是一个有用的性能优化,因为当您的协程实际上不会阻塞 I/O 时将控制权交给事件循环是浪费时间,特别是考虑到 Python 的高函数调用开销。
在你的情况下,fib
永远不会做任何 I/O,所以一旦你在 fib
本身内部调用 yield from fib(n-1)
,事件循环永远不会到达 运行 直到完成递归,这将阻止 fib_handler
从客户端读取任何后续输入,直到完成对 fib
的调用。将对 fib
的调用 全部 包装在 asyncio.async
中可确保每次进行 yield from asyncio.async(fib(...))
调用时都将控制权交给事件循环。当我进行此更改时,除了在 fib_handler
中使用 asyncio.async(fib(n))
之外,我还能够同时处理来自单个客户端的多个输入。这是完整的示例代码:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(4)
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = asyncio.async(fib(n))
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")
Input/Output 在客户端:
dan@dandesk:~$ netcat localhost 25000
35 # This was input
4 # This was input
8 # output
24157817 # output
现在,即使这可行,我也不会使用这个实现,因为它在一个单线程程序中做了一堆 CPU 绑定的工作,该程序也想服务 I/O在同一个线程中。这不会很好地扩展,也不会有理想的性能。相反,我建议在后台进程中使用 loop.run_in_executor
到 运行 对 fib
的调用,这允许异步线程 运行 满负荷运行,也允许我们跨多个内核扩展对 fib
的调用:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
def fib(n):
if n < 1:
return 1
a = fib(n-1)
b = fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(8) # 8 Processes in the pool
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = loop.run_in_executor(executor, fib, n)
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")