如何使用 aiohttp workers queue 加快解析速度?
How can I speed up parsing with aiohttp workers queue?
我需要从主页收集 256 个 url,其中还有 653 个,这 653 个中还有 11000 个。我收集了 653 个,有人帮我 here.Help 我向 worker_iso 添加了一个异步(q) 函数,从 653 收集 11000 个链接,代码很慢,我很痛苦,如果这是一个简单的问题,我将不胜感激 help.Sorry 但我对 asyncio 的经验很少,所以如果任何人都可以提供帮助,我们将不胜感激。更新:我收到了@Andrej Kesely 代码的异常谢谢
import asyncio
import aiohttp
from bs4 import BeautifulSoup
out = []
iso_standart = []
async def get_soup(session, url):
async with session.get(url=url) as resp:
return BeautifulSoup(await resp.text(), "lxml")
async def worker(session, q):
while True:
url, link_name, title = await q.get()
soup = await get_soup(session, url)
links = soup.select('[data-title="Subcommittee"] a')
if links:
for a in links:
out.append("https://www.iso.org" + a["href"])
else:
out.append(url)
q.task_done()
async def worker_iso(q):
for urls in out:
while True:
response = await q.get(urls)
soup = BeautifulSoup(await response.text(), "lxml")
for i in soup.find_all('tr', {'ng-show': 'pChecked || pChecked == null'}):
a1 = i.find('a').attrs['href']
print(a1)
iso_standarts = f'https://www.iso.org{a1}'
iso_standart.append(iso_standarts)
q.task_done()
async def main():
url = "https://www.iso.org/standards-catalogue/browse-by-tc.html"
async with aiohttp.ClientSession() as session:
soup = await get_soup(session, url)
titles = soup.select('td[data-title="Title"]')
links = soup.select('td[data-title="Committee"] a')
committees = []
for a, t in zip(links, titles):
committees.append(
[
"https://www.iso.org" + a["href"],
a.get_text(strip=True),
t.get_text(strip=True),
]
)
queue = asyncio.Queue(maxsize=16)
tasks = []
# create 16 workers that will process data in parallel
for i in range(16):
task = asyncio.create_task(worker(session, queue))
tasks.append(task)
# put some data to worker queue
for c in tqdm.tqdm(committees):
await queue.put(c)
# wait for all data to be processed
await queue.join()
# cancel all worker tasks
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
for i in range(16):
task_iso = asyncio.create_task(worker_iso(queue))
tasks.append(task_iso)
await asyncio.gather(*tasks, return_exceptions=True)
print(len(out))
if __name__ == "__main__":
asyncio.run(main())
此脚本将获得接下来的 ~20k 个指向 iso_standards
列表的链接(大约一分钟内):
import tqdm
import asyncio
import aiohttp
from bs4 import BeautifulSoup
out = []
iso_standards = []
async def get_soup(session, url):
async with session.get(url=url) as resp:
return BeautifulSoup(await resp.text(), "lxml")
async def worker(session, q):
while True:
url, link_name, title = await q.get()
soup = await get_soup(session, url)
links = soup.select('[data-title="Subcommittee"] a')
if links:
for a in links:
out.append("https://www.iso.org" + a["href"])
else:
out.append(url)
q.task_done()
async def worker_iso(session, q):
while True:
url = await q.get()
soup = await get_soup(session, url)
for i in soup.find_all(
"tr", {"ng-show": "pChecked || pChecked == null"}
):
a1 = i.find("a").attrs["href"]
iso_standards.append(f"https://www.iso.org{a1}")
q.task_done()
async def main():
url = "https://www.iso.org/standards-catalogue/browse-by-tc.html"
async with aiohttp.ClientSession() as session:
soup = await get_soup(session, url)
titles = soup.select('td[data-title="Title"]')
links = soup.select('td[data-title="Committee"] a')
committees = []
for a, t in zip(links, titles):
committees.append(
[
"https://www.iso.org" + a["href"],
a.get_text(strip=True),
t.get_text(strip=True),
]
)
queue = asyncio.Queue(maxsize=16)
# Phase 1 - Get 653 links:
tasks = []
# create 16 workers that will process data in parallel
for i in range(16):
task = asyncio.create_task(worker(session, queue))
tasks.append(task)
# put some data to worker queue
for c in tqdm.tqdm(committees):
await queue.put(c)
# wait for all data to be processed
await queue.join()
# cancel all worker tasks
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
# Phase 2 - Get next 18096 links:
tasks = []
# create 16 workers that will process data in parallel
for i in range(16):
task = asyncio.create_task(worker_iso(session, queue))
tasks.append(task)
# put some data to worker queue
for c in tqdm.tqdm(out):
await queue.put(c)
# wait for all data to be processed
await queue.join()
# cancel all worker tasks
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print(len(iso_standards))
if __name__ == "__main__":
asyncio.run(main())
打印:
100%|██████████████████████████████████████████████████████████████████| 256/256 [00:18<00:00, 13.99it/s]
100%|██████████████████████████████████████████████████████████████████| 653/653 [00:42<00:00, 15.47it/s]
21138
我需要从主页收集 256 个 url,其中还有 653 个,这 653 个中还有 11000 个。我收集了 653 个,有人帮我 here.Help 我向 worker_iso 添加了一个异步(q) 函数,从 653 收集 11000 个链接,代码很慢,我很痛苦,如果这是一个简单的问题,我将不胜感激 help.Sorry 但我对 asyncio 的经验很少,所以如果任何人都可以提供帮助,我们将不胜感激。更新:我收到了@Andrej Kesely 代码的异常谢谢
import asyncio
import aiohttp
from bs4 import BeautifulSoup
out = []
iso_standart = []
async def get_soup(session, url):
async with session.get(url=url) as resp:
return BeautifulSoup(await resp.text(), "lxml")
async def worker(session, q):
while True:
url, link_name, title = await q.get()
soup = await get_soup(session, url)
links = soup.select('[data-title="Subcommittee"] a')
if links:
for a in links:
out.append("https://www.iso.org" + a["href"])
else:
out.append(url)
q.task_done()
async def worker_iso(q):
for urls in out:
while True:
response = await q.get(urls)
soup = BeautifulSoup(await response.text(), "lxml")
for i in soup.find_all('tr', {'ng-show': 'pChecked || pChecked == null'}):
a1 = i.find('a').attrs['href']
print(a1)
iso_standarts = f'https://www.iso.org{a1}'
iso_standart.append(iso_standarts)
q.task_done()
async def main():
url = "https://www.iso.org/standards-catalogue/browse-by-tc.html"
async with aiohttp.ClientSession() as session:
soup = await get_soup(session, url)
titles = soup.select('td[data-title="Title"]')
links = soup.select('td[data-title="Committee"] a')
committees = []
for a, t in zip(links, titles):
committees.append(
[
"https://www.iso.org" + a["href"],
a.get_text(strip=True),
t.get_text(strip=True),
]
)
queue = asyncio.Queue(maxsize=16)
tasks = []
# create 16 workers that will process data in parallel
for i in range(16):
task = asyncio.create_task(worker(session, queue))
tasks.append(task)
# put some data to worker queue
for c in tqdm.tqdm(committees):
await queue.put(c)
# wait for all data to be processed
await queue.join()
# cancel all worker tasks
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
for i in range(16):
task_iso = asyncio.create_task(worker_iso(queue))
tasks.append(task_iso)
await asyncio.gather(*tasks, return_exceptions=True)
print(len(out))
if __name__ == "__main__":
asyncio.run(main())
此脚本将获得接下来的 ~20k 个指向 iso_standards
列表的链接(大约一分钟内):
import tqdm
import asyncio
import aiohttp
from bs4 import BeautifulSoup
out = []
iso_standards = []
async def get_soup(session, url):
async with session.get(url=url) as resp:
return BeautifulSoup(await resp.text(), "lxml")
async def worker(session, q):
while True:
url, link_name, title = await q.get()
soup = await get_soup(session, url)
links = soup.select('[data-title="Subcommittee"] a')
if links:
for a in links:
out.append("https://www.iso.org" + a["href"])
else:
out.append(url)
q.task_done()
async def worker_iso(session, q):
while True:
url = await q.get()
soup = await get_soup(session, url)
for i in soup.find_all(
"tr", {"ng-show": "pChecked || pChecked == null"}
):
a1 = i.find("a").attrs["href"]
iso_standards.append(f"https://www.iso.org{a1}")
q.task_done()
async def main():
url = "https://www.iso.org/standards-catalogue/browse-by-tc.html"
async with aiohttp.ClientSession() as session:
soup = await get_soup(session, url)
titles = soup.select('td[data-title="Title"]')
links = soup.select('td[data-title="Committee"] a')
committees = []
for a, t in zip(links, titles):
committees.append(
[
"https://www.iso.org" + a["href"],
a.get_text(strip=True),
t.get_text(strip=True),
]
)
queue = asyncio.Queue(maxsize=16)
# Phase 1 - Get 653 links:
tasks = []
# create 16 workers that will process data in parallel
for i in range(16):
task = asyncio.create_task(worker(session, queue))
tasks.append(task)
# put some data to worker queue
for c in tqdm.tqdm(committees):
await queue.put(c)
# wait for all data to be processed
await queue.join()
# cancel all worker tasks
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
# Phase 2 - Get next 18096 links:
tasks = []
# create 16 workers that will process data in parallel
for i in range(16):
task = asyncio.create_task(worker_iso(session, queue))
tasks.append(task)
# put some data to worker queue
for c in tqdm.tqdm(out):
await queue.put(c)
# wait for all data to be processed
await queue.join()
# cancel all worker tasks
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print(len(iso_standards))
if __name__ == "__main__":
asyncio.run(main())
打印:
100%|██████████████████████████████████████████████████████████████████| 256/256 [00:18<00:00, 13.99it/s]
100%|██████████████████████████████████████████████████████████████████| 653/653 [00:42<00:00, 15.47it/s]
21138