从 concurrent.futures 到 asyncio

From concurrent.futures to asyncio

我有两个关于 concurrent.futures 的问题:

如何在 python concurrent.futures 中打破 time.sleep()?

结论:time.sleep()不能中断。一种解决方案是:您可以围绕它编写一个循环并进行短暂的睡眠。

个别超时 concurrent.futures?

结论:个别超时需要由用户实现。例如:对于每个超时,您可以调用 wait().

问题

asyncio是否解决了这些问题?

在 asyncio 模型中,执行是由事件循环调度和协调的。要取消当前挂起任务的执行,您基本上只需要不恢复它。虽然这在实践中略有不同,但显而易见的是,这使得取消挂起的任务在理论上变得简单。

个别超时当然也可能以同样的方式出现:每当您挂起协程以等待结果时,您都需要提供一个超时值。事件循环将确保在达到超时且任务尚未完成时取消等待任务。

一些具体示例:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
   ...
concurrent.futures._base.CancelledError

在实践中,这可以使用如下方式实现:

class Foo:
    task = None

    async def sleeper(self):
        self.task = asyncio.sleep(60)
        try:
            await self.task
        except concurrent.futures.CancelledError:
            raise NotImplementedError

当此方法处于休眠状态时,其他人可以调用 foo.task.cancel() 唤醒协程并让它处理取消。或者,任何调用 sleeper() 的人都可以直接取消 ,而不给它清理的机会。

设置超时同样简单:

>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
   ...
concurrent.futures._base.TimeoutError

特别是在 HTTP 请求超时的上下文中,请参阅 aiohttp:

async def fetch_page(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            assert response.status == 200
            return await response.read()

with aiohttp.ClientSession(loop=loop) as session:
    content = loop.run_until_complete(fetch_page(session, 'http://python.org'))

显然,每次调用 fetch_page 都可以决定自己的 aiohttp.Timeout 值,并且当达到超时时,每个单独的实例都会抛出自己的异常。

您可以在其异常中立即提出(asyncio.CancelledError)。

我用这个方法来克服它:

import asyncio

async def worker():
    try:
        # await for some coroutine process
    except asyncio.CancelledError:
        # Do stuff
        raise asyncio.CancelledError()
    except Exception as exc:
        # Do stuff
        print(exc)
    finally:
        await asyncio.sleep(2)