asyncio 网络抓取 101:使用 aiohttp 获取多个 url

asyncio web scraping 101: fetching multiple urls with aiohttp

在之前的问题中,aiohttp 的一位作者友善地建议使用来自 Python 3.5:

的新 async with 语法来
import aiohttp
import asyncio

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

async def fetch_all(session, urls, loop):
    results = await asyncio.wait([loop.create_task(fetch(session, url))
                                  for url in urls])
    return results

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # breaks because of the first url
    urls = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
            'http://google.com',
            'http://twitter.com']
    with aiohttp.ClientSession(loop=loop) as session:
        the_results = loop.run_until_complete(
            fetch_all(session, urls, loop))
        # do something with the the_results

然而,当 session.get(url) 请求之一中断时(如上所述,由于 http://SDFKHSKHGKLHSKLJHGSDFKSJH.com)错误未得到处理,整个事情都中断了。

我想方设法插入关于 session.get(url) 结果的测试,例如寻找 try ... except ...if response.status != 200: 的位置,但我只是不明白如何使用 async withawait 和各种对象。

由于async with还很新,所以例子不多。如果 asyncio 向导可以显示如何执行此操作,那将对许多人非常有帮助。毕竟大多数人想要使用 asyncio 进行测试的第一件事就是同时获取多个资源。

目标

目标是我们可以检查 the_results 并快速查看:

  • 此 url 失败(以及原因:状态代码,可能是异常名称),或
  • 这个url有效,这是一个有用的响应对象

我远不是 asyncio 专家,但你想捕获你需要捕获套接字错误的错误:

async def fetch(session, url):
    with aiohttp.Timeout(10):
        try:
            async with session.get(url) as response:
                print(response.status == 200)
                return await response.text()
        except socket.error as e:
            print(e.strerror)

运行 代码并打印 the_results:

Cannot connect to host sdfkhskhgklhskljhgsdfksjh.com:80 ssl:False [Can not connect to sdfkhskhgklhskljhgsdfksjh.com:80 [Name or service not known]]
True
True
({<Task finished coro=<fetch() done, defined at <ipython-input-7-535a26aaaefe>:5> result='<!DOCTYPE ht...y>\n</html>\n'>, <Task finished coro=<fetch() done, defined at <ipython-input-7-535a26aaaefe>:5> result=None>, <Task finished coro=<fetch() done, defined at <ipython-input-7-535a26aaaefe>:5> result='<!doctype ht.../body></html>'>}, set())

您可以看到我们捕获了错误并且进一步的调用仍然成功返回 html。

我们可能真的应该捕获一个 OSError 因为 socket.error 是 A deprecated alias of OSError 因为 python 3.3:

async def fetch(session, url):
    with aiohttp.Timeout(10):
        try:
            async with session.get(url) as response:
                return await response.text()
        except OSError as e:
            print(e)

如果您还想检查响应是否为 200,请将您的 if 也放在 try 中,您可以使用 reason 属性获取更多信息:

async def fetch(session, url):
    with aiohttp.Timeout(10):
        try:
            async with session.get(url) as response:
                if response.status != 200:
                    print(response.reason)
                return await response.text()
        except OSError as e:
            print(e.strerror)

我会使用 gather 而不是 wait,它可以 return 异常作为对象,而不引发它们。然后你可以检查每个结果,如果它是某个异常的实例。

import aiohttp
import asyncio

async def fetch(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            return await response.text()

async def fetch_all(session, urls, loop):
    results = await asyncio.gather(
        *[fetch(session, url) for url in urls],
        return_exceptions=True  # default is false, that would raise
    )

    # for testing purposes only
    # gather returns results in the order of coros
    for idx, url in enumerate(urls):
        print('{}: {}'.format(url, 'ERR' if isinstance(results[idx], Exception) else 'OK'))
    return results

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # breaks because of the first url
    urls = [
        'http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
        'http://google.com',
        'http://twitter.com']
    with aiohttp.ClientSession(loop=loop) as session:
        the_results = loop.run_until_complete(
            fetch_all(session, urls, loop))

测试:

$python test.py 
http://SDFKHSKHGKLHSKLJHGSDFKSJH.com: ERR
http://google.com: OK
http://twitter.com: OK