如何在 Python 中同时迭代和 运行 AsyncGenerator
How to iterate and run AsyncGenerator concurrently in Python
对Python有点陌生,不知道这个问题是不是太天真了。试图掌握并发模型。
第三方功能(来自库)通过 ssh 连接到多个主机并执行一些 bash 命令。这个函数 returns AsyncGenerator.
然后我的代码使用 async for
:
遍历此 AsyncGenerator
# some code before
asyncgenerator_var = # ...
async for (host, exit_code, stdout, stderr) in asyncgenerator_var:
if exit_code != 0:
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
continue
self.out(f"[{host}]: {''.join(decode_strings(stdout))}")
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
# some code after
然后代码调用 await 函数。但是如果运行一个接着一个。不是同时。
代码有人解释这是为什么?以及应该怎么做才能使它 运行 同时发生。
在异步模型中,没有函数是同时 运行 的。如果当前功能是 await
-ing 其他功能/期货,事件循环可能会切换功能。
async for
语句本质上意味着事件循环可能 运行 其他预定 callbacks/tasks 迭代之间。
async for
正文仍然 运行 按照异步生成器生成的顺序。
以任意顺序 运行 正文,将其包装在异步函数中。为每个输入创建一个单独的任务,最后gather
所有任务的结果。
# some code before
asyncgenerator_var = # ...
async def task(host, exit_code, stdout, stderr):
if exit_code != 0:
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
return
self.out(f"[{host}]: {''.join(decode_strings(stdout))}")
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
tasks = []
async for (host, exit_code, stdout, stderr) in asyncgenerator_var:
tasks.append(asyncio.create_task(task, host, exit_code, stdout, stderr))
await asyncio.gather(*tasks)
# some code after
对Python有点陌生,不知道这个问题是不是太天真了。试图掌握并发模型。
第三方功能(来自库)通过 ssh 连接到多个主机并执行一些 bash 命令。这个函数 returns AsyncGenerator.
然后我的代码使用 async for
:
# some code before
asyncgenerator_var = # ...
async for (host, exit_code, stdout, stderr) in asyncgenerator_var:
if exit_code != 0:
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
continue
self.out(f"[{host}]: {''.join(decode_strings(stdout))}")
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
# some code after
然后代码调用 await 函数。但是如果运行一个接着一个。不是同时。
代码有人解释这是为什么?以及应该怎么做才能使它 运行 同时发生。
在异步模型中,没有函数是同时 运行 的。如果当前功能是 await
-ing 其他功能/期货,事件循环可能会切换功能。
async for
语句本质上意味着事件循环可能 运行 其他预定 callbacks/tasks 迭代之间。
async for
正文仍然 运行 按照异步生成器生成的顺序。
以任意顺序 运行 正文,将其包装在异步函数中。为每个输入创建一个单独的任务,最后gather
所有任务的结果。
# some code before
asyncgenerator_var = # ...
async def task(host, exit_code, stdout, stderr):
if exit_code != 0:
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
return
self.out(f"[{host}]: {''.join(decode_strings(stdout))}")
self.err(f"[{host}]: {''.join(decode_strings(stderr))}")
tasks = []
async for (host, exit_code, stdout, stderr) in asyncgenerator_var:
tasks.append(asyncio.create_task(task, host, exit_code, stdout, stderr))
await asyncio.gather(*tasks)
# some code after