asyncio.gather 的替代方案,我可以在运行时继续向其中添加协程?
Alternative to asyncio.gather which I can keep adding coroutines to at runtime?
我需要能够在运行时不断向 asyncio 循环中添加协程。我尝试使用 create_task()
认为这会做我想要的,但仍然需要等待。
这是我的代码,不确定是否有简单的编辑可以让它工作?
async def get_value_from_api():
global ASYNC_CLIENT
return ASYNC_CLIENT.get(api_address)
async def print_subs():
count = await get_value_from_api()
print(count)
async def save_subs_loop():
while True:
asyncio.create_task(print_subs())
time.sleep(0.1)
async def start():
global ASYNC_CLIENT
async with httpx.AsyncClient() as ASYNC_CLIENT:
await save_subs_loop()
asyncio.run(start())
我曾经在混合 trio
和 kivy
时创建了 similar pattern,这是 运行 异步宁多个协程的演示。
它使用了一个trio.MemoryChannel
,大致等同于asyncio.Queue
,我在这里将其称为queue
。
主要思想是:
- 用 class 包装每个任务,它具有 运行 功能。
- 使 class 对象自己的异步方法在执行完成时将对象本身放入
queue
。
- 创建全局任务生成循环以等待
queue
中的对象并为该对象安排 execution/create 任务。
import asyncio
import traceback
import httpx
async def task_1(client: httpx.AsyncClient):
resp = await client.get("http://127.0.0.1:5000/")
print(resp.read())
await asyncio.sleep(0.1) # without this would be IP ban
async def task_2(client: httpx.AsyncClient):
resp = await client.get("http://127.0.0.1:5000/meow/")
print(resp.read())
await asyncio.sleep(0.5)
class CoroutineWrapper:
def __init__(self, queue: asyncio.Queue, coro_func, *param):
self.func = coro_func
self.param = param
self.queue = queue
async def run(self):
try:
await self.func(*self.param)
except Exception:
traceback.print_exc()
return
# put itself back into queue
await self.queue.put(self)
class KeepRunning:
def __init__(self):
# queue for gathering CoroutineWrapper
self.queue = asyncio.Queue()
def add_task(self, coro, *param):
wrapped = CoroutineWrapper(self.queue, coro, *param)
# add tasks to be executed in queue
self.queue.put_nowait(wrapped)
async def task_processor(self):
task: CoroutineWrapper
while task := await self.queue.get():
# wait for new CoroutineWrapper Object then schedule it's async method execution
asyncio.create_task(task.run())
async def main():
keep_running = KeepRunning()
async with httpx.AsyncClient() as client:
keep_running.add_task(task_1, client)
keep_running.add_task(task_2, client)
await keep_running.task_processor()
asyncio.run(main())
服务器
import time
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return str(time.time())
@app.route("/meow/")
def meow():
return "meow"
app.run()
输出:
b'meow'
b'1639920445.965701'
b'1639920446.0767004'
b'1639920446.1887035'
b'1639920446.2986999'
b'1639920446.4067013'
b'meow'
b'1639920446.516704'
b'1639920446.6267014'
...
您可以看到任务 运行ning 按自己的节奏重复。
旧答案
看来您只想循环固定数量的任务。
在这种情况下,只需使用 itertools.cycle
迭代协程列表
但这与同步没有什么不同,所以让我知道您是否需要异步。
import asyncio
import itertools
import httpx
async def main_task(client: httpx.AsyncClient):
resp = await client.get("http://127.0.0.1:5000/")
print(resp.read())
await asyncio.sleep(0.1) # without this would be IP ban
async def main():
async with httpx.AsyncClient() as client:
for coroutine in itertools.cycle([main_task]):
await coroutine(client)
asyncio.run(main())
服务器:
import time
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return str(time.time())
app.run()
输出:
b'1639918937.7694323'
b'1639918937.8804302'
b'1639918937.9914327'
b'1639918938.1014295'
b'1639918938.2124324'
b'1639918938.3204308'
...
asyncio.create_task()
按照您的描述工作。你在这里遇到的问题是你在这里创建了一个无限循环:
async def save_subs_loop():
while True:
asyncio.create_task(print_subs())
time.sleep(0.1) # do not use time.sleep() in async code EVER
save_subs_loop()
不断创建任务,但控制权永远不会交还给事件循环,因为那里没有 await
。尝试
async def save_subs_loop():
while True:
asyncio.create_task(print_subs())
await asyncio.sleep(0.1) # yield control back to loop to give tasks a chance to actually run
这个问题很常见,我想 python 如果在协程中检测到 time.sleep()
应该引发 RuntimeError
:-)
您可能想尝试 TaskThread 框架
- 它允许您在运行时添加任务
- 任务是 re-scheduled 周期性的(就像上面的 while 循环一样)
- 您似乎需要一个内置的消费者/生产者框架(parent/child 关系)
免责声明:我出于需要编写了 TaskThread,它救了我的命。
我需要能够在运行时不断向 asyncio 循环中添加协程。我尝试使用 create_task()
认为这会做我想要的,但仍然需要等待。
这是我的代码,不确定是否有简单的编辑可以让它工作?
async def get_value_from_api():
global ASYNC_CLIENT
return ASYNC_CLIENT.get(api_address)
async def print_subs():
count = await get_value_from_api()
print(count)
async def save_subs_loop():
while True:
asyncio.create_task(print_subs())
time.sleep(0.1)
async def start():
global ASYNC_CLIENT
async with httpx.AsyncClient() as ASYNC_CLIENT:
await save_subs_loop()
asyncio.run(start())
我曾经在混合 trio
和 kivy
时创建了 similar pattern,这是 运行 异步宁多个协程的演示。
它使用了一个trio.MemoryChannel
,大致等同于asyncio.Queue
,我在这里将其称为queue
。
主要思想是:
- 用 class 包装每个任务,它具有 运行 功能。
- 使 class 对象自己的异步方法在执行完成时将对象本身放入
queue
。 - 创建全局任务生成循环以等待
queue
中的对象并为该对象安排 execution/create 任务。
import asyncio
import traceback
import httpx
async def task_1(client: httpx.AsyncClient):
resp = await client.get("http://127.0.0.1:5000/")
print(resp.read())
await asyncio.sleep(0.1) # without this would be IP ban
async def task_2(client: httpx.AsyncClient):
resp = await client.get("http://127.0.0.1:5000/meow/")
print(resp.read())
await asyncio.sleep(0.5)
class CoroutineWrapper:
def __init__(self, queue: asyncio.Queue, coro_func, *param):
self.func = coro_func
self.param = param
self.queue = queue
async def run(self):
try:
await self.func(*self.param)
except Exception:
traceback.print_exc()
return
# put itself back into queue
await self.queue.put(self)
class KeepRunning:
def __init__(self):
# queue for gathering CoroutineWrapper
self.queue = asyncio.Queue()
def add_task(self, coro, *param):
wrapped = CoroutineWrapper(self.queue, coro, *param)
# add tasks to be executed in queue
self.queue.put_nowait(wrapped)
async def task_processor(self):
task: CoroutineWrapper
while task := await self.queue.get():
# wait for new CoroutineWrapper Object then schedule it's async method execution
asyncio.create_task(task.run())
async def main():
keep_running = KeepRunning()
async with httpx.AsyncClient() as client:
keep_running.add_task(task_1, client)
keep_running.add_task(task_2, client)
await keep_running.task_processor()
asyncio.run(main())
服务器
import time
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return str(time.time())
@app.route("/meow/")
def meow():
return "meow"
app.run()
输出:
b'meow'
b'1639920445.965701'
b'1639920446.0767004'
b'1639920446.1887035'
b'1639920446.2986999'
b'1639920446.4067013'
b'meow'
b'1639920446.516704'
b'1639920446.6267014'
...
您可以看到任务 运行ning 按自己的节奏重复。
旧答案
看来您只想循环固定数量的任务。
在这种情况下,只需使用 itertools.cycle
但这与同步没有什么不同,所以让我知道您是否需要异步。
import asyncio
import itertools
import httpx
async def main_task(client: httpx.AsyncClient):
resp = await client.get("http://127.0.0.1:5000/")
print(resp.read())
await asyncio.sleep(0.1) # without this would be IP ban
async def main():
async with httpx.AsyncClient() as client:
for coroutine in itertools.cycle([main_task]):
await coroutine(client)
asyncio.run(main())
服务器:
import time
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return str(time.time())
app.run()
输出:
b'1639918937.7694323'
b'1639918937.8804302'
b'1639918937.9914327'
b'1639918938.1014295'
b'1639918938.2124324'
b'1639918938.3204308'
...
asyncio.create_task()
按照您的描述工作。你在这里遇到的问题是你在这里创建了一个无限循环:
async def save_subs_loop():
while True:
asyncio.create_task(print_subs())
time.sleep(0.1) # do not use time.sleep() in async code EVER
save_subs_loop()
不断创建任务,但控制权永远不会交还给事件循环,因为那里没有 await
。尝试
async def save_subs_loop():
while True:
asyncio.create_task(print_subs())
await asyncio.sleep(0.1) # yield control back to loop to give tasks a chance to actually run
这个问题很常见,我想 python 如果在协程中检测到 time.sleep()
应该引发 RuntimeError
:-)
您可能想尝试 TaskThread 框架
- 它允许您在运行时添加任务
- 任务是 re-scheduled 周期性的(就像上面的 while 循环一样)
- 您似乎需要一个内置的消费者/生产者框架(parent/child 关系)
免责声明:我出于需要编写了 TaskThread,它救了我的命。