Python asyncio, aiohttp: 动态更新任务循环应该工作直到特定条件

Python asyncio, aiohttp: dynamically updating tasks loop should work until particular condition

我有用于 api 的简单脚本。我创建了 number_of_user 个用户。我通过循环,为每个用户创建随机用户 create_random_user(),我创建任务并将任务附加到循环。用户创建任务 async def fetch_user_create 在获得响应后为用户登录创建另一个任务 async def fetch_user_log 并将其添加到所有任务。我的问题:我怎么能等到 len(tasks_user) == 2 * number_of_user.

我尝试放置 await asyncio.sleep(1) - 它可以工作,但取决于 number_of_user.

目标:等到条件可能吗?还是我哪里做错了?

async def fetch_user_log(session, data_user):
    async with session.post(url_user_login, data=data_user) as response:

async def fetch_user_create(session, data_user, tasks_user_create):
    async with session.post(url_user_create, data=data_user) as response:
        task2 = asyncio.ensure_future(fetch_user_log(session, data_user))
        tasks_user.append(task2)
        #await asyncio.sleep(1)
        await response.read()
        
        #await asyncio.gather(*tasks_user) - tried

async def run():
    tasks_user = []
    async with ClientSession() as session:
        for i in range(number_of_user):
            data_user = create_random_user()
            task = asyncio.ensure_future(fetch_user_create(session, data_user, tasks_user_create))
            tasks_user.append(task)
        await asyncio.wait(tasks_user)
        #await asyncio.gather(*tasks_user) - tried

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)

等到条件可以用asyncio.Condition实现:

import asyncio

async def test_wait_for(cond, tasks):
    print(".")
    async with cond:
        await cond.wait_for(lambda: (len(tasks)>3))
    print("!")

async def test_add_task(cond, tasks):
    for i in range(6):
        print(i)
        await asyncio.sleep(1)
        async with cond:
            tasks.append(i)
            cond.notify_all()

async def run():
    cond = asyncio.Condition()
    tasks = []
    asyncio.ensure_future(test_wait_for(cond, tasks))
    await test_add_task(cond, tasks)
    


loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)

条件为事件+锁定。使用 notify_all 发送的事件会解锁所有 waitwait_for 协程。在这里锁定也会锁定任务数组。

如果将条件移动到发送通知的任务,也可以用asyncio.Event实现。