并行检查无效 URL

Parallelize checking of dead URLs

问题很简单:是否可以使用异步函数测试 URL 列表并仅将死 URL(响应代码 > 400)存储在列表中?

我以前使用 requests 库来完成它并且效果很好,但我有一大堆要测试的 URL,如果我按顺序执行它需要 1 个多小时。

我看到很多关于如何使用 asyncioaiohttp 发出并行请求的文章,但我没有看到很多关于如何使用这些库测试 URL 的文章。

可以吗?

你可以使用 aiohttp 和 asyncio 做这样的事情。

我想可以做得更像 pythonic,但这应该可行。

import aiohttp
import asyncio

urls = ['url1', 'url2']


async def test_url(session, url):
    async with session.get(url) as resp:
        if resp.status > 400:
            return url



async def main():

    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(asyncio.ensure_future(test_url(session, url)))
        dead_urls = await asyncio.gather(*tasks)
        print(dead_urls)
        
asyncio.run(main())

非常基本的例子,但我会这样解决:

from aiohttp import ClientSession
from asyncio import create_task, gather, run

async def TestUrl(url, session):
    async with session.get(url) as response:
        if response.status >= 400:
            r = await response.text()
            print(f"Site: {url} is dead, response code: {str(response.status)} response text: {r}")

async def TestUrls(urls):
    resultsList: list = []
    async with ClientSession() as session:
        # Maybe some rate limiting?
        partitionTasks: list = [
             create_task(TestUrl(url, session))
             for url in urls]
        resultsList.append(await gather(*partitionTasks, return_exceptions=False))
    # do stuff with the results or return?
    return(resultsList)

async def main():
    urls = []
    test = await TestUrls(urls)

if __name__ == "__main__":
    run(main())

尝试使用 ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
import requests

url_list=[
    "https://www.google.com",
    "https://www.adsadasdad.com",
    "https://www.14fsdfsff.com",
    "https://www.ggr723tg.com",
    "https://www.yyyyyyyyyyyyyyy.com",
    "https://www.78sdf8sf5sf45sf.com",
    "https://www.wikipedia.com",
    "https://www.464dfgdfg235345.com",
    "https://www.tttllldjfh.com",
    "https://www.qqqqqqqqqq456.com"
]

def check(url):
    r=requests.get(url)
    if r.status_code < 400:
        print(f"{url} is ALIVE")

with ThreadPoolExecutor(max_workers=5) as e:
    for url in url_list:
        e.submit(check, url)

使用多线程你可以这样做:

import requests
from concurrent.futures import ThreadPoolExecutor

results = dict()

# test the given url 
# add url and status code to the results dictionary if GET succeeds but status code >= 400
# also add url to results dictionary if an exception arises with full exception details
def test_url(url):
    try:
        r = requests.get(url)
        if r.status_code >= 400:
            results[url] = f'{r.status_code=}'
    except requests.exceptions.RequestException as e:
        results[url] = str(e)

# return a list of URLs to be checked. probably get these from a file in reality
def get_list_of_urls():
    return ['https://facebook.com', 'https://google.com', 'http://google.com/nonsense', 'http://goooglyeyes.org']

def main():
    with ThreadPoolExecutor() as executor:
        executor.map(test_url, get_list_of_urls())
    print(results)

if __name__ == '__main__':
    main()

多处理可能是解决您的问题的更好选择。

from multiprocessing import Process
from multiprocessing import Manager
import requests

def checkURLStatus(url, url_status):
    res = requests.get(url)
    if res.status_code >= 400:
        url_status[url] = "Inactive"
    else:
        url_status[url] = "Active"

if __name__ == "__main__":
    urls = [
    "https://www.google.com"
    ]
    manager = Manager()
    # to store the results for later usage
    url_status = manager.dict()

    procs = []

    for url in urls:
        proc = Process(target=checkURLStatus, args=(url, url_status))
        procs.append(proc)
        proc.start()
    
    for proc in procs:
        proc.join()
    print(url_status.values())

url_status是一个共享变量,用于存储独立线程的数据。有关详细信息,请参阅 this page