Python asyncio, aiohttp: 动态更新任务循环应该工作直到特定条件
Python asyncio, aiohttp: dynamically updating tasks loop should work until particular condition
我有用于 api 的简单脚本。我创建了 number_of_user
个用户。我通过循环,为每个用户创建随机用户 create_random_user()
,我创建任务并将任务附加到循环。用户创建任务 async def fetch_user_create
在获得响应后为用户登录创建另一个任务 async def fetch_user_log
并将其添加到所有任务。我的问题:我怎么能等到 len(tasks_user) == 2 * number_of_user
.
我尝试放置 await asyncio.sleep(1) - 它可以工作,但取决于 number_of_user.
目标:等到条件可能吗?还是我哪里做错了?
async def fetch_user_log(session, data_user):
async with session.post(url_user_login, data=data_user) as response:
async def fetch_user_create(session, data_user, tasks_user_create):
async with session.post(url_user_create, data=data_user) as response:
task2 = asyncio.ensure_future(fetch_user_log(session, data_user))
tasks_user.append(task2)
#await asyncio.sleep(1)
await response.read()
#await asyncio.gather(*tasks_user) - tried
async def run():
tasks_user = []
async with ClientSession() as session:
for i in range(number_of_user):
data_user = create_random_user()
task = asyncio.ensure_future(fetch_user_create(session, data_user, tasks_user_create))
tasks_user.append(task)
await asyncio.wait(tasks_user)
#await asyncio.gather(*tasks_user) - tried
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)
等到条件可以用asyncio.Condition实现:
import asyncio
async def test_wait_for(cond, tasks):
print(".")
async with cond:
await cond.wait_for(lambda: (len(tasks)>3))
print("!")
async def test_add_task(cond, tasks):
for i in range(6):
print(i)
await asyncio.sleep(1)
async with cond:
tasks.append(i)
cond.notify_all()
async def run():
cond = asyncio.Condition()
tasks = []
asyncio.ensure_future(test_wait_for(cond, tasks))
await test_add_task(cond, tasks)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)
条件为事件+锁定。使用 notify_all
发送的事件会解锁所有 wait
和 wait_for
协程。在这里锁定也会锁定任务数组。
如果将条件移动到发送通知的任务,也可以用asyncio.Event实现。
我有用于 api 的简单脚本。我创建了 number_of_user
个用户。我通过循环,为每个用户创建随机用户 create_random_user()
,我创建任务并将任务附加到循环。用户创建任务 async def fetch_user_create
在获得响应后为用户登录创建另一个任务 async def fetch_user_log
并将其添加到所有任务。我的问题:我怎么能等到 len(tasks_user) == 2 * number_of_user
.
我尝试放置 await asyncio.sleep(1) - 它可以工作,但取决于 number_of_user.
目标:等到条件可能吗?还是我哪里做错了?
async def fetch_user_log(session, data_user):
async with session.post(url_user_login, data=data_user) as response:
async def fetch_user_create(session, data_user, tasks_user_create):
async with session.post(url_user_create, data=data_user) as response:
task2 = asyncio.ensure_future(fetch_user_log(session, data_user))
tasks_user.append(task2)
#await asyncio.sleep(1)
await response.read()
#await asyncio.gather(*tasks_user) - tried
async def run():
tasks_user = []
async with ClientSession() as session:
for i in range(number_of_user):
data_user = create_random_user()
task = asyncio.ensure_future(fetch_user_create(session, data_user, tasks_user_create))
tasks_user.append(task)
await asyncio.wait(tasks_user)
#await asyncio.gather(*tasks_user) - tried
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)
等到条件可以用asyncio.Condition实现:
import asyncio
async def test_wait_for(cond, tasks):
print(".")
async with cond:
await cond.wait_for(lambda: (len(tasks)>3))
print("!")
async def test_add_task(cond, tasks):
for i in range(6):
print(i)
await asyncio.sleep(1)
async with cond:
tasks.append(i)
cond.notify_all()
async def run():
cond = asyncio.Condition()
tasks = []
asyncio.ensure_future(test_wait_for(cond, tasks))
await test_add_task(cond, tasks)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)
条件为事件+锁定。使用 notify_all
发送的事件会解锁所有 wait
和 wait_for
协程。在这里锁定也会锁定任务数组。
如果将条件移动到发送通知的任务,也可以用asyncio.Event实现。