如何在 Python 中加速异步请求

How to speed up async requests in Python

我想 download/scrape 来自站点的 5000 万条日志记录。我没有一次下载 5000 万个,而是尝试使用以下代码一次下载 1000 万个部分,但它一次只处理 20,000 个(超过这个数会引发错误),因此它变得很耗时下载那么多数据。目前,下载20,000条记录需要3-4分钟100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s]的速度,如何加快速度?

import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio

nest_asyncio.apply()


async def make_numbers(numbers, _numbers):
    for i in range(numbers, _numbers):
        yield i


n = 0
q = 10000000


async def fetch():
    # example
    url = "https://httpbin.org/anything/log?id="

    async with aiohttp.ClientSession() as session:
        post_tasks = []
        # prepare the coroutines that poat
        async for x in make_numbers(n, q):
            post_tasks.append(do_get(session, url, x))
        # now execute them all at once

        responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]


async def do_get(session, url, x):
    headers = {
        'Content-Type': "application/x-www-form-urlencoded",
        'Access-Control-Allow-Origin': "*",
        'Accept-Encoding': "gzip, deflate",
        'Accept-Language': "en-US"
    }

    async with session.get(url + str(x), headers=headers) as response:
        data = await response.text()
        print(data)


s = time.perf_counter()
try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch())
except:
    print("error")

elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")

回溯(最近调用最后):

File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 986, in _wrap_create_connection
    return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1056, in create_connection
    raise exceptions[0]
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection
    sock = await self._connect_sock(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock
    await self.sock_connect(sock, address)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect
    return await self._proactor.connect(sock, address)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 328, in __wakeup
    future.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 812, in _poll
    value = callback(transferred, key, ov)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 599, in finish_connect
    ov.getresult()
OSError: [WinError 121] The semaphore timeout period has expired

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 136, in <module>
    loop.run_until_complete(fetch())
  File "C:\Users\SGM\AppData\Roaming\Python\Python39\site-packages\nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
    raise self._exception
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 256, in __step
    result = coro.send(None)
  File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 88, in fetch
    response = await f
  File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 37, in _wait_for_one
    return f.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
    raise self._exception
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 125, in do_get
    async with session.get(url + str(x), headers=headers) as response:
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__
    self._resp = await self._coro
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 535, in _request
    conn = await self._connector.connect(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 542, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1206, in _create_direct_connection
    raise last_exc
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1175, in _create_direct_connection
    transp, proto = await self._wrap_create_connection(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 992, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]

如果不是带宽限制了你(但我无法检查),有一个解决方案比 celery 和 rabbitmq 更简单,但它的可扩展性不如 celery 和 rabbitmq,它将受到你的限制CPU.

的数量

不是将调用拆分到 celery worker 上,而是将它们拆分到多个进程上。

我修改了 fetch 函数如下:

async def fetch(start, end):
    # example
    url = "https://httpbin.org/anything/log?id="
    async with aiohttp.ClientSession() as session:
        post_tasks = []
        # prepare the coroutines that poat
        # use start and end arguments here!
        async for x in make_numbers(start, end):
            post_tasks.append(do_get(session, url, x))
        # now execute them all at once

        responses = [await f for f in
                     tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]

我修改了主要流程:

import concurrent.futures
from itertools import count

def one_executor(start, end):
    loop = asyncio.new_event_loop()
    try:
        loop.run_until_complete(fetch(start, end))
    except:
        print("error")


if __name__ == '__main__':

    s = time.perf_counter()
    # Change the value to the number of core you want to use.
    max_worker = 4
    length_by_executor = q // max_worker
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor:
        for index_min in count(0, length_by_executor):
            # no matter with duplicated indexes due to the use of 
            # range in make_number function.
            index_max = min(index_min + length_by_executor, q)
            executor.submit(one_executor, index_min, index_max)
            if index_max == q:
                break

    elapsed = time.perf_counter() - s
    print(f"executed in {elapsed:0.2f} seconds.")

这是我得到的结果(q 的值设置为 10_000):

1 worker: executed in 13.90 seconds.
2 workers: executed in 7.24 seconds.
3 workers: executed in 6.82 seconds.

我不在 tqdm 进度条上工作,使用当前的解决方案,将显示两个进度条(但我认为 tqdm 适用于多进程)。

瓶颈:同时连接数

首先,瓶颈是 TCP 连接器中的同时连接总数。

aiohttp.TCPConnector 的默认值为 limit=100。在大多数系统上(在 macOS 上测试过),您应该能够通过将 connectorlimit=200:

一起传递来将其加倍
# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:

花费的时间应该会显着减少。 (在 macOS 上:q = 20_000 从 58 秒减少 43% 到 33 秒,q = 10_000 从 31 秒减少 42% 到 18 秒。)

您可以配置的 limit 取决于您的机器可以打开的文件描述符的数量。 (在macOS上:可以运行ulimit -n查看,ulimit -n 1024增加当前终端会话为1024,然后改为limit=1000。相比limit=100q = 20_000 减少 76% 至 14 秒,q = 10_000 减少 71% 至 9 秒。)

支持 5000 万个请求:异步生成器

其次,5000万个请求之所以出现挂起,仅仅是因为其数量庞大。

仅仅在 post_tasks 中创建 1000 万个协程需要 68-98 秒(在我的机器上差异很大),然后事件循环进一步负担那么多任务,其中 99.99% 被阻塞TCP 连接池。

我们可以使用异步生成器推迟协程的创建:

async def make_async_gen(f, n, q):
    async for x in make_numbers(n, q):
        yield f(x)

我们需要 asyncio.as_completed() 的对应物来处理 async_genconcurrency:

from asyncio import ensure_future, events
from asyncio.queues import Queue

def as_completed_for_async_gen(fs_async_gen, concurrency):
    done = Queue()
    loop = events.get_event_loop()
    # todo = {ensure_future(f, loop=loop) for f in set(fs)}  # -
    todo = set()                                             # +

    def _on_completion(f):
        todo.remove(f)
        done.put_nowait(f)
        loop.create_task(_add_next())  # +

    async def _wait_for_one():
        f = await done.get()
        return f.result()

    async def _add_next():  # +
        try:
            f = await fs_async_gen.__anext__()
        except StopAsyncIteration:
            return
        f = ensure_future(f, loop=loop)
        f.add_done_callback(_on_completion)
        todo.add(f)

    # for f in todo:                           # -
    #     f.add_done_callback(_on_completion)  # -
    # for _ in range(len(todo)):               # -
    #     yield _wait_for_one()                # -
    for _ in range(concurrency):               # +
        loop.run_until_complete(_add_next())   # +
    while todo:                                # +
        yield _wait_for_one()                  # +

然后,我们更新fetch()

from functools import partial

CONCURRENCY = 200  # +

n = 0
q = 50_000_000

async def fetch():
    # example
    url = "https://httpbin.org/anything/log?id="

    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
        # post_tasks = []                                                # -
        # # prepare the coroutines that post                             # -
        # async for x in make_numbers(n, q):                             # -
        #     post_tasks.append(do_get(session, url, x))                 # -
        # Prepare the coroutines generator                               # +
        async_gen = make_async_gen(partial(do_get, session, url), n, q)  # +

        # now execute them all at once                                                                         # -
        # responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))]     # -
        # Now execute them with a specified concurrency                                                        # +
        responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]  # +

其他限制

通过以上,程序可以开始处理 5000 万个请求但是:

  1. 根据 tqdm 的估计,CONCURRENCY = 1000 仍需要 8 个小时左右。
  2. 您的程序可能 运行 内存不足 responses 并崩溃。

对于第 2 点,您可能应该这样做:

# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
    response = await f
    
    # Do something with response, such as writing to a local file
    # ...

代码错误

do_get() 应该 return data:

async def do_get(session, url, x):
    headers = {
        'Content-Type': "application/x-www-form-urlencoded",
        'Access-Control-Allow-Origin': "*",
        'Accept-Encoding': "gzip, deflate",
        'Accept-Language': "en-US"
    }

    async with session.get(url + str(x), headers=headers) as response:
        data = await response.text()
        # print(data)  # -
        return data    # +