从 concurrent.futures 到 asyncio
From concurrent.futures to asyncio
我有两个关于 concurrent.futures 的问题:
如何在 python concurrent.futures 中打破 time.sleep()?
结论:time.sleep()不能中断。一种解决方案是:您可以围绕它编写一个循环并进行短暂的睡眠。
见
个别超时 concurrent.futures?
结论:个别超时需要由用户实现。例如:对于每个超时,您可以调用 wait().
见
问题
asyncio是否解决了这些问题?
在 asyncio 模型中,执行是由事件循环调度和协调的。要取消当前挂起任务的执行,您基本上只需要不恢复它。虽然这在实践中略有不同,但显而易见的是,这使得取消挂起的任务在理论上变得简单。
个别超时当然也可能以同样的方式出现:每当您挂起协程以等待结果时,您都需要提供一个超时值。事件循环将确保在达到超时且任务尚未完成时取消等待任务。
一些具体示例:
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
...
concurrent.futures._base.CancelledError
在实践中,这可以使用如下方式实现:
class Foo:
task = None
async def sleeper(self):
self.task = asyncio.sleep(60)
try:
await self.task
except concurrent.futures.CancelledError:
raise NotImplementedError
当此方法处于休眠状态时,其他人可以调用 foo.task.cancel()
唤醒协程并让它处理取消。或者,任何调用 sleeper()
的人都可以直接取消 它,而不给它清理的机会。
设置超时同样简单:
>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
...
concurrent.futures._base.TimeoutError
特别是在 HTTP 请求超时的上下文中,请参阅 aiohttp:
async def fetch_page(session, url):
with aiohttp.Timeout(10):
async with session.get(url) as response:
assert response.status == 200
return await response.read()
with aiohttp.ClientSession(loop=loop) as session:
content = loop.run_until_complete(fetch_page(session, 'http://python.org'))
显然,每次调用 fetch_page
都可以决定自己的 aiohttp.Timeout
值,并且当达到超时时,每个单独的实例都会抛出自己的异常。
您可以在其异常中立即提出(asyncio.CancelledError
)。
我用这个方法来克服它:
import asyncio
async def worker():
try:
# await for some coroutine process
except asyncio.CancelledError:
# Do stuff
raise asyncio.CancelledError()
except Exception as exc:
# Do stuff
print(exc)
finally:
await asyncio.sleep(2)
我有两个关于 concurrent.futures 的问题:
如何在 python concurrent.futures 中打破 time.sleep()?
结论:time.sleep()不能中断。一种解决方案是:您可以围绕它编写一个循环并进行短暂的睡眠。
见
个别超时 concurrent.futures?
结论:个别超时需要由用户实现。例如:对于每个超时,您可以调用 wait().
见
问题
asyncio是否解决了这些问题?
在 asyncio 模型中,执行是由事件循环调度和协调的。要取消当前挂起任务的执行,您基本上只需要不恢复它。虽然这在实践中略有不同,但显而易见的是,这使得取消挂起的任务在理论上变得简单。
个别超时当然也可能以同样的方式出现:每当您挂起协程以等待结果时,您都需要提供一个超时值。事件循环将确保在达到超时且任务尚未完成时取消等待任务。
一些具体示例:
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
...
concurrent.futures._base.CancelledError
在实践中,这可以使用如下方式实现:
class Foo:
task = None
async def sleeper(self):
self.task = asyncio.sleep(60)
try:
await self.task
except concurrent.futures.CancelledError:
raise NotImplementedError
当此方法处于休眠状态时,其他人可以调用 foo.task.cancel()
唤醒协程并让它处理取消。或者,任何调用 sleeper()
的人都可以直接取消 它,而不给它清理的机会。
设置超时同样简单:
>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
...
concurrent.futures._base.TimeoutError
特别是在 HTTP 请求超时的上下文中,请参阅 aiohttp:
async def fetch_page(session, url):
with aiohttp.Timeout(10):
async with session.get(url) as response:
assert response.status == 200
return await response.read()
with aiohttp.ClientSession(loop=loop) as session:
content = loop.run_until_complete(fetch_page(session, 'http://python.org'))
显然,每次调用 fetch_page
都可以决定自己的 aiohttp.Timeout
值,并且当达到超时时,每个单独的实例都会抛出自己的异常。
您可以在其异常中立即提出(asyncio.CancelledError
)。
我用这个方法来克服它:
import asyncio
async def worker():
try:
# await for some coroutine process
except asyncio.CancelledError:
# Do stuff
raise asyncio.CancelledError()
except Exception as exc:
# Do stuff
print(exc)
finally:
await asyncio.sleep(2)