如何在 Python 中同时迭代和 运行 AsyncGenerator

How to iterate and run AsyncGenerator concurrently in Python

对Python有点陌生,不知道这个问题是不是太天真了。试图掌握并发模型。

第三方功能(来自库)通过 ssh 连接到多个主机并执行一些 bash 命令。这个函数 returns AsyncGenerator.

然后我的代码使用 async for:

遍历此 AsyncGenerator
        # some code before
        asyncgenerator_var = # ...
        async for (host, exit_code, stdout, stderr) in asyncgenerator_var:
            if exit_code != 0:
                self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
                continue
            self.out(f"[{host}]: {''.join(decode_strings(stdout))}")
            self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
        # some code after

然后代码调用 await 函数。但是如果运行一个接着一个。不是同时。

代码有人解释这是为什么?以及应该怎么做才能使它 运行 同时发生。

在异步模型中,没有函数是同时 运行 的。如果当前功能是 await-ing 其他功能/期货,事件循环可能会切换功能。

async for 语句本质上意味着事件循环可能 运行 其他预定 callbacks/tasks 迭代之间。

async for 正文仍然 运行 按照异步生成器生成的顺序。

以任意顺序 运行 正文,将其包装在异步函数中。为每个输入创建一个单独的任务,最后gather所有任务的结果。

# some code before

asyncgenerator_var =  # ...

async def task(host, exit_code, stdout, stderr):
    if exit_code != 0:
        self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
        return 
    self.out(f"[{host}]: {''.join(decode_strings(stdout))}")
    self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
    
tasks = []
async for (host, exit_code, stdout, stderr) in asyncgenerator_var:
    tasks.append(asyncio.create_task(task, host, exit_code, stdout, stderr))

await asyncio.gather(*tasks)
    
# some code after