asyncio.gather 的替代方案,我可以在运行时继续向其中添加协程?

Alternative to asyncio.gather which I can keep adding coroutines to at runtime?

我需要能够在运行时不断向 asyncio 循环中添加协程。我尝试使用 create_task() 认为这会做我想要的,但仍然需要等待。

这是我的代码,不确定是否有简单的编辑可以让它工作?

async def get_value_from_api():
    global ASYNC_CLIENT
    return ASYNC_CLIENT.get(api_address)


async def print_subs():
    count = await get_value_from_api()
    print(count)


async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        time.sleep(0.1)


async def start():
    global ASYNC_CLIENT
    async with httpx.AsyncClient() as ASYNC_CLIENT:
        await save_subs_loop()


asyncio.run(start())

我曾经在混合 triokivy 时创建了 similar pattern,这是 运行 异步宁多个协程的演示。

它使用了一个trio.MemoryChannel,大致等同于asyncio.Queue,我在这里将其称为queue

主要思想是:

  1. 用 class 包装每个任务,它具有 运行 功能。
  2. 使 class 对象自己的异步方法在执行完成时将对象本身放入 queue
  3. 创建全局任务生成循环以等待 queue 中的对象并为该对象安排 execution/create 任务。
import asyncio
import traceback

import httpx


async def task_1(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/")
    print(resp.read())
    await asyncio.sleep(0.1)  # without this would be IP ban


async def task_2(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/meow/")
    print(resp.read())
    await asyncio.sleep(0.5)


class CoroutineWrapper:
    def __init__(self, queue: asyncio.Queue,  coro_func, *param):
        self.func = coro_func
        self.param = param
        self.queue = queue

    async def run(self):
        try:
            await self.func(*self.param)
        except Exception:
            traceback.print_exc()
            return
        
        # put itself back into queue
        await self.queue.put(self)


class KeepRunning:
    def __init__(self):
        # queue for gathering CoroutineWrapper
        self.queue = asyncio.Queue()

    def add_task(self, coro, *param):
        wrapped = CoroutineWrapper(self.queue, coro, *param)
        
        # add tasks to be executed in queue
        self.queue.put_nowait(wrapped)

    async def task_processor(self):
        task: CoroutineWrapper
        while task := await self.queue.get():
            # wait for new CoroutineWrapper Object then schedule it's async method execution
            asyncio.create_task(task.run())


async def main():
    keep_running = KeepRunning()
    async with httpx.AsyncClient() as client:
        keep_running.add_task(task_1, client)
        keep_running.add_task(task_2, client)

        await keep_running.task_processor()

asyncio.run(main())

服务器

import time

from flask import Flask
app = Flask(__name__)


@app.route("/")
def hello():
    return str(time.time())


@app.route("/meow/")
def meow():
    return "meow"


app.run()

输出:

b'meow'
b'1639920445.965701'
b'1639920446.0767004'
b'1639920446.1887035'
b'1639920446.2986999'
b'1639920446.4067013'
b'meow'
b'1639920446.516704'
b'1639920446.6267014'
...

您可以看到任务 运行ning 按自己的节奏重复。


旧答案

看来您只想循环固定数量的任务。

在这种情况下,只需使用 itertools.cycle

迭代协程列表

但这与同步没有什么不同,所以让我知道您是否需要异步。

import asyncio
import itertools

import httpx


async def main_task(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/")
    print(resp.read())
    await asyncio.sleep(0.1)  # without this would be IP ban


async def main():
    async with httpx.AsyncClient() as client:
        for coroutine in itertools.cycle([main_task]):
            await coroutine(client)


asyncio.run(main())

服务器:

import time

from flask import Flask
app = Flask(__name__)


@app.route("/")
def hello():
    return str(time.time())


app.run()

输出:

b'1639918937.7694323'
b'1639918937.8804302'
b'1639918937.9914327'
b'1639918938.1014295'
b'1639918938.2124324'
b'1639918938.3204308'
...

asyncio.create_task() 按照您的描述工作。你在这里遇到的问题是你在这里创建了一个无限循环:

async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        time.sleep(0.1) # do not use time.sleep() in async code EVER

save_subs_loop() 不断创建任务,但控制权永远不会交还给事件循环,因为那里没有 await。尝试

async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        await asyncio.sleep(0.1) # yield control back to loop to give tasks a chance to actually run

这个问题很常见,我想 python 如果在协程中检测到 time.sleep() 应该引发 RuntimeError :-)

您可能想尝试 TaskThread 框架

  • 它允许您在运行时添加任务
  • 任务是 re-scheduled 周期性的(就像上面的 while 循环一样)
  • 您似乎需要一个内置的消费者/生产者框架(parent/child 关系)

免责声明:我出于需要编写了 TaskThread,它救了我的命。