在异步生成器函数中从托儿所内部产生 yield 不好吗?

Is yielding from inside a nursery in an asynchronous generator function bad?

有人告诉我以下代码不安全,因为它不允许有从 nursery 内部产生的异步生成器,除非它是异步上下文管理器。

T = TypeVar('T')

async def delay(interval: float, source: AsyncIterable[T]) -> AsyncIterable[T]:
    """Delays each item in source by an interval.

    Received items are temporarily stored in an unbounded queue, along with a timestamp, using
    a background task. The foreground task takes items from the queue, and waits until the
    item is older than the given interval and then yields it."""

    send_channel, receive_channel = trio.open_memory_channel(math.inf)

    async def pull_task():
        async with aclosing(source) as agen:
            async for item in agen:
                send_channel.send_nowait((item, trio.current_time() + interval))

    async with trio.open_nursery() as nursery:
        nursery.start_soon(pull_task)
        async with receive_channel:
            async for item, timestamp in receive_channel:
                now = trio.current_time()
                if timestamp > now:
                    await trio.sleep(timestamp - now)
                yield item

我很难理解这怎么可能会崩溃。如果有人能提供一个使用这个确切的生成器函数的示例代码,它证明了不安全性,我们将不胜感激和奖励。

以上代码的目标是在不施加任何背压的情况下延迟异步序列的处理。如果您能证明此代码无法像我预期的那样工作,那我们也将不胜感激。

谢谢。

不幸的是,这是正确的 - yield 在 nursery 或 cancel 范围内不受支持,除非在使用 @contextlib.asynccontextmanager 创建异步上下文管理器或编写异步 pytest fixture 的狭窄情况下.

这有几个原因。其中一些是技术性的:Trio 必须跟踪哪些 nurseries/cancel 个作用域当前在堆栈中处于“活动”状态,当你 yield 超出其中一个时,它会破坏嵌套,而 Trio 没有办法知道你已经做到了。 (库无法从上下文管理器中检测到 yield。)

但还有一个根本的、无法解决的原因,那就是 Trio 和结构化并发的整个思想是,每个任务都“属于”一个父任务,如果子任务崩溃,父任务可以收到通知。但是当你在生成器中 yield 时,生成器框架会被冻结并与当前任务分离——它可能会在另一个任务中恢复,或者根本不会恢复。因此,当您 yield 时,这会打破 nursery 中所有子任务与其父任务之间的 link。只是没有办法将其与结构化并发的原则相协调。

在 Trio 聊天中,Joshua Oreman gave a specific example 打破了你的情况:

if I run the following

async def arange(*args):
    for val in range(*args):
        yield val

async def break_it():
    async with aclosing(delay(0, arange(3))) as aiter:
        with trio.move_on_after(1):
            async for value in aiter:
                await trio.sleep(0.4)
                print(value)

trio.run(break_it)

then I get

RuntimeError: Cancel scope stack corrupted: attempted to exit
<trio.CancelScope at 0x7f364621c280, active, cancelled> in <Task
'__main__.break_it' at 0x7f36462152b0> that's still within its child
<trio.CancelScope at 0x7f364621c400, active>

This is probably a bug in your code, that has caused Trio's internal
state to become corrupted. We'll do our best to recover, but from now
on there are no guarantees.

Typically this is caused by one of the following:
  - yielding within a generator or async generator that's opened a cancel
    scope or nursery (unless the generator is a @contextmanager or
    @asynccontextmanager); see https://github.com/python-trio/trio/issues/638 [...]

By changing the timeouts and delay so that the timeout expired while inside the generator rather than while outside of it, I was able to get a different error also: trio.MultiError: Cancelled(), GeneratorExit() raised out of aclosing()

这里也对所有这些问题进行了长时间的讨论,这是我们发现无法支持的地方:https://github.com/python-trio/trio/issues/264

这是一个不幸的情况,既因为我们不能支持它是一种耻辱,更糟糕的是它看起来它在简单的情况下可以工作,所以人们可以结束在意识到它不起作用之前写了很多使用这个技巧的代码:-(

我们的计划是让非法案例在你尝试yield时立即给出一个明显的错误,至少可以避免第二个问题。但是,这需要一段时间,因为它需要 adding some extra hooks to the Python interpreter.

也可以创建一个 几乎 与异步生成器一样易于编写和使用的构造,但可以避免此问题。这个想法是,不是从使用它的任务的堆栈中推送和弹出生成器,而是 运行 “生成器”代码作为第二个任务来提供消费者任务值。有关详细信息,请参阅 the thread starting here