遍历异步循环

Iterate through asyncio loop

我对 aiohttp 和 asyncio 还很陌生,所以很抱歉我之前的无知。我在文档的事件循环部分遇到困难,并且认为我的以下代码不是异步执行的。我正在尝试通过 itertools 和 POST 将两个列表的所有组合输出到 XML。使用 requests 模块时 here 列出了一个更完整的版本,但这并不理想,因为我一次可能需要 POST 1000 多个请求。这是它现在的样子的示例:

import aiohttp
import asyncio
import itertools

skillid = ['7715','7735','7736','7737','7738','7739','7740','7741','7742','7743','7744','7745','7746','7747','7748' ,'7749','7750','7751','7752','7753','7754','7755','7756','7757','7758','7759','7760','7761','7762','7763','7764','7765','7766','7767','7768','7769','7770','7771','7772','7773','7774','7775','7776','7777','7778','7779','7780','7781','7782','7783','7784']

agent= ['5124','5315','5331','5764','6049','6076','6192','6323','6669','7690','7716']

url = 'https://url'

user = 'user'
password = 'pass'
headers = {
        'Content-Type': 'application/xml'
      }

async def main():
    async with aiohttp.ClientSession() as session:
        for x in itertools.product(agent,skillid):
            payload = "<operation><operationType>update</operationType><refURLs><refURL>/unifiedconfig/config/agent/" + x[0] + "</refURL></refURLs><changeSet><agent><skillGroupsRemoved><skillGroup><refURL>/unifiedconfig/config/skillgroup/" + x[1] + "</refURL></skillGroup></skillGroupsRemoved></agent></changeSet></operation>"
        async with session.post(url,auth=aiohttp.BasicAuth(user, password), data=payload,headers=headers) as resp:
            print(resp.status)
            print(await resp.text())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

我看到可以使用协程,但不确定是否适用,因为只有一个任务要执行。任何澄清表示赞赏。

因为您正在提出请求,然后立即 await-ing 处理它,所以您一次只提出一个请求。如果要并行化所有内容,则需要将发出请求与等待响应分开,并且需要使用 asyncio.gather 之类的东西来批量等待请求。

在下面的示例中,我修改了您的代码以连接到本地 httpbin 实例进行测试;我正在向 /delay/<value> 端点发出请求,以便每个请求都需要随机的时间来完成。

这里的操作理论是:

  • 将请求代码移到异步one_request函数中, 我们用它来构建任务数组。

  • 使用asyncio.gather一次性完成运行所有任务。

  • one_request函数returns一个(agent, skillid, response) 元组,这样当我们遍历响应时,我们可以分辨出哪个 参数组合产生了给定的响应。

import aiohttp
import asyncio
import itertools
import random

skillid = [
    "7715", "7735", "7736", "7737", "7738", "7739", "7740", "7741", "7742",
    "7743", "7744", "7745", "7746", "7747", "7748", "7749", "7750", "7751",
    "7752", "7753", "7754", "7755", "7756", "7757", "7758", "7759", "7760",
    "7761", "7762", "7763", "7764", "7765", "7766", "7767", "7768", "7769",
    "7770", "7771", "7772", "7773", "7774", "7775", "7776", "7777", "7778",
    "7779", "7780", "7781", "7782", "7783", "7784",
]

agent = [
    "5124", "5315", "5331", "5764", "6049", "6076", "6192", "6323", "6669",
    "7690", "7716",
]

user = 'user'
password = 'pass'
headers = {
        'Content-Type': 'application/xml'
      }


async def one_request(session, agent, skillid):
    # I'm setting `url` here because I want a random parameter for
    # reach request. You would probably just set this once globally.
    delay = random.randint(0, 10)
    url = f'http://localhost:8787/delay/{delay}'

    payload = (
        "<operation>"
        "<operationType>update</operationType>"
        "<refURLs>"
        f"<refURL>/unifiedconfig/config/agent/{agent}</refURL>"
        "</refURLs>"
        "<changeSet>"
        "<agent>"
        "<skillGroupsRemoved><skillGroup>"
        f"<refURL>/unifiedconfig/config/skillgroup/{skillid}</refURL>"
        "</skillGroup></skillGroupsRemoved>"
        "</agent>"
        "</changeSet>"
        "</operation>"
    )

    # This shows when the task actually executes.
    print('req', agent, skillid)

    async with session.post(
            url, auth=aiohttp.BasicAuth(user, password),
            data=payload, headers=headers) as resp:
        return (agent, skillid, await resp.text())


async def main():
    tasks = []
    async with aiohttp.ClientSession() as session:
        # Add tasks to the `tasks` array
        for x in itertools.product(agent, skillid):
            task = asyncio.ensure_future(one_request(session, x[0], x[1]))
            tasks.append(task)

        print(f'making {len(tasks)} requests')

        # Run all the tasks and wait for them to complete. Return
        # values will end up in the `responses` list.
        responses = await asyncio.gather(*tasks)

        # Just print everything out.
        print(responses)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

以上代码产生了大约 561 个请求,运行s 产生了大约 30 个 我引入的随机延迟秒数。

此代码 运行 一次发送所有请求。如果你想限制 最大并发请求数,你可以引入一个 Semaphore 使 one_request 在有太多活动请求时阻塞。

如果您想在响应到达时对其进行处理,而不是 等待一切完成,你可以调查 asyncio.wait 方法。