遍历异步循环
Iterate through asyncio loop
我对 aiohttp 和 asyncio 还很陌生,所以很抱歉我之前的无知。我在文档的事件循环部分遇到困难,并且认为我的以下代码不是异步执行的。我正在尝试通过 itertools
和 POST 将两个列表的所有组合输出到 XML。使用 requests
模块时 here 列出了一个更完整的版本,但这并不理想,因为我一次可能需要 POST 1000 多个请求。这是它现在的样子的示例:
import aiohttp
import asyncio
import itertools
skillid = ['7715','7735','7736','7737','7738','7739','7740','7741','7742','7743','7744','7745','7746','7747','7748' ,'7749','7750','7751','7752','7753','7754','7755','7756','7757','7758','7759','7760','7761','7762','7763','7764','7765','7766','7767','7768','7769','7770','7771','7772','7773','7774','7775','7776','7777','7778','7779','7780','7781','7782','7783','7784']
agent= ['5124','5315','5331','5764','6049','6076','6192','6323','6669','7690','7716']
url = 'https://url'
user = 'user'
password = 'pass'
headers = {
'Content-Type': 'application/xml'
}
async def main():
async with aiohttp.ClientSession() as session:
for x in itertools.product(agent,skillid):
payload = "<operation><operationType>update</operationType><refURLs><refURL>/unifiedconfig/config/agent/" + x[0] + "</refURL></refURLs><changeSet><agent><skillGroupsRemoved><skillGroup><refURL>/unifiedconfig/config/skillgroup/" + x[1] + "</refURL></skillGroup></skillGroupsRemoved></agent></changeSet></operation>"
async with session.post(url,auth=aiohttp.BasicAuth(user, password), data=payload,headers=headers) as resp:
print(resp.status)
print(await resp.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
我看到可以使用协程,但不确定是否适用,因为只有一个任务要执行。任何澄清表示赞赏。
因为您正在提出请求,然后立即 await
-ing 处理它,所以您一次只提出一个请求。如果要并行化所有内容,则需要将发出请求与等待响应分开,并且需要使用 asyncio.gather
之类的东西来批量等待请求。
在下面的示例中,我修改了您的代码以连接到本地 httpbin 实例进行测试;我正在向 /delay/<value>
端点发出请求,以便每个请求都需要随机的时间来完成。
这里的操作理论是:
将请求代码移到异步one_request
函数中,
我们用它来构建任务数组。
使用asyncio.gather
一次性完成运行所有任务。
one_request
函数returns一个(agent, skillid, response)
元组,这样当我们遍历响应时,我们可以分辨出哪个
参数组合产生了给定的响应。
import aiohttp
import asyncio
import itertools
import random
skillid = [
"7715", "7735", "7736", "7737", "7738", "7739", "7740", "7741", "7742",
"7743", "7744", "7745", "7746", "7747", "7748", "7749", "7750", "7751",
"7752", "7753", "7754", "7755", "7756", "7757", "7758", "7759", "7760",
"7761", "7762", "7763", "7764", "7765", "7766", "7767", "7768", "7769",
"7770", "7771", "7772", "7773", "7774", "7775", "7776", "7777", "7778",
"7779", "7780", "7781", "7782", "7783", "7784",
]
agent = [
"5124", "5315", "5331", "5764", "6049", "6076", "6192", "6323", "6669",
"7690", "7716",
]
user = 'user'
password = 'pass'
headers = {
'Content-Type': 'application/xml'
}
async def one_request(session, agent, skillid):
# I'm setting `url` here because I want a random parameter for
# reach request. You would probably just set this once globally.
delay = random.randint(0, 10)
url = f'http://localhost:8787/delay/{delay}'
payload = (
"<operation>"
"<operationType>update</operationType>"
"<refURLs>"
f"<refURL>/unifiedconfig/config/agent/{agent}</refURL>"
"</refURLs>"
"<changeSet>"
"<agent>"
"<skillGroupsRemoved><skillGroup>"
f"<refURL>/unifiedconfig/config/skillgroup/{skillid}</refURL>"
"</skillGroup></skillGroupsRemoved>"
"</agent>"
"</changeSet>"
"</operation>"
)
# This shows when the task actually executes.
print('req', agent, skillid)
async with session.post(
url, auth=aiohttp.BasicAuth(user, password),
data=payload, headers=headers) as resp:
return (agent, skillid, await resp.text())
async def main():
tasks = []
async with aiohttp.ClientSession() as session:
# Add tasks to the `tasks` array
for x in itertools.product(agent, skillid):
task = asyncio.ensure_future(one_request(session, x[0], x[1]))
tasks.append(task)
print(f'making {len(tasks)} requests')
# Run all the tasks and wait for them to complete. Return
# values will end up in the `responses` list.
responses = await asyncio.gather(*tasks)
# Just print everything out.
print(responses)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
以上代码产生了大约 561 个请求,运行s 产生了大约 30 个
我引入的随机延迟秒数。
此代码 运行 一次发送所有请求。如果你想限制
最大并发请求数,你可以引入一个
Semaphore
使 one_request
在有太多活动请求时阻塞。
如果您想在响应到达时对其进行处理,而不是
等待一切完成,你可以调查
asyncio.wait
方法。
我对 aiohttp 和 asyncio 还很陌生,所以很抱歉我之前的无知。我在文档的事件循环部分遇到困难,并且认为我的以下代码不是异步执行的。我正在尝试通过 itertools
和 POST 将两个列表的所有组合输出到 XML。使用 requests
模块时 here 列出了一个更完整的版本,但这并不理想,因为我一次可能需要 POST 1000 多个请求。这是它现在的样子的示例:
import aiohttp
import asyncio
import itertools
skillid = ['7715','7735','7736','7737','7738','7739','7740','7741','7742','7743','7744','7745','7746','7747','7748' ,'7749','7750','7751','7752','7753','7754','7755','7756','7757','7758','7759','7760','7761','7762','7763','7764','7765','7766','7767','7768','7769','7770','7771','7772','7773','7774','7775','7776','7777','7778','7779','7780','7781','7782','7783','7784']
agent= ['5124','5315','5331','5764','6049','6076','6192','6323','6669','7690','7716']
url = 'https://url'
user = 'user'
password = 'pass'
headers = {
'Content-Type': 'application/xml'
}
async def main():
async with aiohttp.ClientSession() as session:
for x in itertools.product(agent,skillid):
payload = "<operation><operationType>update</operationType><refURLs><refURL>/unifiedconfig/config/agent/" + x[0] + "</refURL></refURLs><changeSet><agent><skillGroupsRemoved><skillGroup><refURL>/unifiedconfig/config/skillgroup/" + x[1] + "</refURL></skillGroup></skillGroupsRemoved></agent></changeSet></operation>"
async with session.post(url,auth=aiohttp.BasicAuth(user, password), data=payload,headers=headers) as resp:
print(resp.status)
print(await resp.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
我看到可以使用协程,但不确定是否适用,因为只有一个任务要执行。任何澄清表示赞赏。
因为您正在提出请求,然后立即 await
-ing 处理它,所以您一次只提出一个请求。如果要并行化所有内容,则需要将发出请求与等待响应分开,并且需要使用 asyncio.gather
之类的东西来批量等待请求。
在下面的示例中,我修改了您的代码以连接到本地 httpbin 实例进行测试;我正在向 /delay/<value>
端点发出请求,以便每个请求都需要随机的时间来完成。
这里的操作理论是:
将请求代码移到异步
one_request
函数中, 我们用它来构建任务数组。使用
asyncio.gather
一次性完成运行所有任务。one_request
函数returns一个(agent, skillid, response)
元组,这样当我们遍历响应时,我们可以分辨出哪个 参数组合产生了给定的响应。
import aiohttp
import asyncio
import itertools
import random
skillid = [
"7715", "7735", "7736", "7737", "7738", "7739", "7740", "7741", "7742",
"7743", "7744", "7745", "7746", "7747", "7748", "7749", "7750", "7751",
"7752", "7753", "7754", "7755", "7756", "7757", "7758", "7759", "7760",
"7761", "7762", "7763", "7764", "7765", "7766", "7767", "7768", "7769",
"7770", "7771", "7772", "7773", "7774", "7775", "7776", "7777", "7778",
"7779", "7780", "7781", "7782", "7783", "7784",
]
agent = [
"5124", "5315", "5331", "5764", "6049", "6076", "6192", "6323", "6669",
"7690", "7716",
]
user = 'user'
password = 'pass'
headers = {
'Content-Type': 'application/xml'
}
async def one_request(session, agent, skillid):
# I'm setting `url` here because I want a random parameter for
# reach request. You would probably just set this once globally.
delay = random.randint(0, 10)
url = f'http://localhost:8787/delay/{delay}'
payload = (
"<operation>"
"<operationType>update</operationType>"
"<refURLs>"
f"<refURL>/unifiedconfig/config/agent/{agent}</refURL>"
"</refURLs>"
"<changeSet>"
"<agent>"
"<skillGroupsRemoved><skillGroup>"
f"<refURL>/unifiedconfig/config/skillgroup/{skillid}</refURL>"
"</skillGroup></skillGroupsRemoved>"
"</agent>"
"</changeSet>"
"</operation>"
)
# This shows when the task actually executes.
print('req', agent, skillid)
async with session.post(
url, auth=aiohttp.BasicAuth(user, password),
data=payload, headers=headers) as resp:
return (agent, skillid, await resp.text())
async def main():
tasks = []
async with aiohttp.ClientSession() as session:
# Add tasks to the `tasks` array
for x in itertools.product(agent, skillid):
task = asyncio.ensure_future(one_request(session, x[0], x[1]))
tasks.append(task)
print(f'making {len(tasks)} requests')
# Run all the tasks and wait for them to complete. Return
# values will end up in the `responses` list.
responses = await asyncio.gather(*tasks)
# Just print everything out.
print(responses)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
以上代码产生了大约 561 个请求,运行s 产生了大约 30 个 我引入的随机延迟秒数。
此代码 运行 一次发送所有请求。如果你想限制
最大并发请求数,你可以引入一个
Semaphore
使 one_request
在有太多活动请求时阻塞。
如果您想在响应到达时对其进行处理,而不是
等待一切完成,你可以调查
asyncio.wait
方法。