实现异步迭代器

Implementing an asynchronous iterator

Per PEP-492 我正在尝试实现一个异步迭代器,这样我就可以做,例如

async for foo in bar:
    ...

这是一个简单的例子,类似于文档中的例子,带有非常基本的实例化和异步迭代测试:

import pytest

class TestImplementation:
    def __aiter__(self):
        return self
    async def __anext__(self):
        raise StopAsyncIteration


@pytest.mark.asyncio  # note use of pytest-asyncio marker
async def test_async_for():
    async for _ in TestImplementation():
        pass

但是,当我执行我的测试套件时,我看到:

=================================== FAILURES ===================================
________________________________ test_async_for ________________________________

    @pytest.mark.asyncio
    async def test_async_for():
>       async for _ in TestImplementation():
E       TypeError: 'async for' received an invalid object from __aiter__: TestImplementation

...: TypeError
===================== 1 failed, ... passed in 2.89 seconds ======================

为什么我的 TestImplementation 似乎无效?据我所知它符合协议:

  1. An object must implement an __aiter__ method ... returning an asynchronous iterator object.
  2. An asynchronous iterator object must implement an __anext__ method ... returning an awaitable.
  3. To stop iteration __anext__ must raise a StopAsyncIteration exception.

最新发布的 Python (3.5.1)、py.test (2.9.2) 和 pytest-asyncio (0.4.1) 版本失败。

如果您阅读 a little further down the documentation,它会提到(强调我的):

PEP 492 was accepted in CPython 3.5.0 with __aiter__ defined as a method, that was expected to return an awaitable resolving to an asynchronous iterator.

In 3.5.2 (as PEP 492 was accepted on a provisional basis) the __aiter__ protocol was updated to return asynchronous iterators directly.

因此,对于 3.5.2 之前的版本(2016 年 6 月 27 日发布),文档与如何编写有效的异步迭代器略有不一致。 3.5.0 和 3.5.1 的固定版本如下所示:

class TestImplementation:
    async def __aiter__(self):
  # ^ note
        return self
    async def __anext__(self):
        raise StopAsyncIteration

这是在关闭 bug #27243 and is a little clearer in the data model documentation 时引入的,这也提出了一种编写向后兼容代码的方法。

异步迭代器已在 Python 3.6 中实现 - 参见 PEP-525

那么您根本不需要 TestImplementation 来使用 async for。您可以只使用 yield(取自 PEP-525 的示例):

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

然后您可以按预期使用 async for

async for i in ticker(1, 10):                                                                     
    print(f'Tick #{i}')