如何在 Python 中加速异步请求
How to speed up async requests in Python
我想 download/scrape 来自站点的 5000 万条日志记录。我没有一次下载 5000 万个,而是尝试使用以下代码一次下载 1000 万个部分,但它一次只处理 20,000 个(超过这个数会引发错误),因此它变得很耗时下载那么多数据。目前,下载20,000条记录需要3-4分钟100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s]
的速度,如何加快速度?
import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio
nest_asyncio.apply()
async def make_numbers(numbers, _numbers):
for i in range(numbers, _numbers):
yield i
n = 0
q = 10000000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
async for x in make_numbers(n, q):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
print(data)
s = time.perf_counter()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch())
except:
print("error")
elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")
回溯(最近调用最后):
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 986, in _wrap_create_connection
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1056, in create_connection
raise exceptions[0]
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection
sock = await self._connect_sock(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock
await self.sock_connect(sock, address)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect
return await self._proactor.connect(sock, address)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 328, in __wakeup
future.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 812, in _poll
value = callback(transferred, key, ov)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 599, in finish_connect
ov.getresult()
OSError: [WinError 121] The semaphore timeout period has expired
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 136, in <module>
loop.run_until_complete(fetch())
File "C:\Users\SGM\AppData\Roaming\Python\Python39\site-packages\nest_asyncio.py", line 81, in run_until_complete
return f.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
raise self._exception
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 256, in __step
result = coro.send(None)
File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 88, in fetch
response = await f
File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 37, in _wait_for_one
return f.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
raise self._exception
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 258, in __step
result = coro.throw(exc)
File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 125, in do_get
async with session.get(url + str(x), headers=headers) as response:
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__
self._resp = await self._coro
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 535, in _request
conn = await self._connector.connect(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 542, in connect
proto = await self._create_connection(req, traces, timeout)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1206, in _create_direct_connection
raise last_exc
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1175, in _create_direct_connection
transp, proto = await self._wrap_create_connection(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 992, in _wrap_create_connection
raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]
如果不是带宽限制了你(但我无法检查),有一个解决方案比 celery 和 rabbitmq 更简单,但它的可扩展性不如 celery 和 rabbitmq,它将受到你的限制CPU.
的数量
不是将调用拆分到 celery worker 上,而是将它们拆分到多个进程上。
我修改了 fetch
函数如下:
async def fetch(start, end):
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
# use start and end arguments here!
async for x in make_numbers(start, end):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in
tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
我修改了主要流程:
import concurrent.futures
from itertools import count
def one_executor(start, end):
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(fetch(start, end))
except:
print("error")
if __name__ == '__main__':
s = time.perf_counter()
# Change the value to the number of core you want to use.
max_worker = 4
length_by_executor = q // max_worker
with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor:
for index_min in count(0, length_by_executor):
# no matter with duplicated indexes due to the use of
# range in make_number function.
index_max = min(index_min + length_by_executor, q)
executor.submit(one_executor, index_min, index_max)
if index_max == q:
break
elapsed = time.perf_counter() - s
print(f"executed in {elapsed:0.2f} seconds.")
这是我得到的结果(q
的值设置为 10_000
):
1 worker: executed in 13.90 seconds.
2 workers: executed in 7.24 seconds.
3 workers: executed in 6.82 seconds.
我不在 tqdm
进度条上工作,使用当前的解决方案,将显示两个进度条(但我认为 tqdm 适用于多进程)。
瓶颈:同时连接数
首先,瓶颈是 TCP 连接器中的同时连接总数。
aiohttp.TCPConnector
的默认值为 limit=100
。在大多数系统上(在 macOS 上测试过),您应该能够通过将 connector
与 limit=200
:
一起传递来将其加倍
# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:
花费的时间应该会显着减少。 (在 macOS 上:q = 20_000
从 58 秒减少 43% 到 33 秒,q = 10_000
从 31 秒减少 42% 到 18 秒。)
您可以配置的 limit
取决于您的机器可以打开的文件描述符的数量。 (在macOS上:可以运行ulimit -n
查看,ulimit -n 1024
增加当前终端会话为1024,然后改为limit=1000
。相比limit=100
,q = 20_000
减少 76% 至 14 秒,q = 10_000
减少 71% 至 9 秒。)
支持 5000 万个请求:异步生成器
其次,5000万个请求之所以出现挂起,仅仅是因为其数量庞大。
仅仅在 post_tasks
中创建 1000 万个协程需要 68-98 秒(在我的机器上差异很大),然后事件循环进一步负担那么多任务,其中 99.99% 被阻塞TCP 连接池。
我们可以使用异步生成器推迟协程的创建:
async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)
我们需要 asyncio.as_completed()
的对应物来处理 async_gen
和 concurrency
:
from asyncio import ensure_future, events
from asyncio.queues import Queue
def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +
def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +
async def _wait_for_one():
f = await done.get()
return f.result()
async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)
# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +
然后,我们更新fetch()
:
from functools import partial
CONCURRENCY = 200 # +
n = 0
q = 50_000_000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +
# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +
其他限制
通过以上,程序可以开始处理 5000 万个请求但是:
- 根据
tqdm
的估计,CONCURRENCY = 1000
仍需要 8 个小时左右。
- 您的程序可能 运行 内存不足
responses
并崩溃。
对于第 2 点,您可能应该这样做:
# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f
# Do something with response, such as writing to a local file
# ...
代码错误
do_get()
应该 return data
:
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +
我想 download/scrape 来自站点的 5000 万条日志记录。我没有一次下载 5000 万个,而是尝试使用以下代码一次下载 1000 万个部分,但它一次只处理 20,000 个(超过这个数会引发错误),因此它变得很耗时下载那么多数据。目前,下载20,000条记录需要3-4分钟100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s]
的速度,如何加快速度?
import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio
nest_asyncio.apply()
async def make_numbers(numbers, _numbers):
for i in range(numbers, _numbers):
yield i
n = 0
q = 10000000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
async for x in make_numbers(n, q):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
print(data)
s = time.perf_counter()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch())
except:
print("error")
elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")
回溯(最近调用最后):
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 986, in _wrap_create_connection
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1056, in create_connection
raise exceptions[0]
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection
sock = await self._connect_sock(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock
await self.sock_connect(sock, address)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect
return await self._proactor.connect(sock, address)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 328, in __wakeup
future.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 812, in _poll
value = callback(transferred, key, ov)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 599, in finish_connect
ov.getresult()
OSError: [WinError 121] The semaphore timeout period has expired
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 136, in <module>
loop.run_until_complete(fetch())
File "C:\Users\SGM\AppData\Roaming\Python\Python39\site-packages\nest_asyncio.py", line 81, in run_until_complete
return f.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
raise self._exception
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 256, in __step
result = coro.send(None)
File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 88, in fetch
response = await f
File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 37, in _wait_for_one
return f.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
raise self._exception
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 258, in __step
result = coro.throw(exc)
File "C:\Users\SGM\Desktop\xnet\x3Whosebug.py", line 125, in do_get
async with session.get(url + str(x), headers=headers) as response:
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__
self._resp = await self._coro
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 535, in _request
conn = await self._connector.connect(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 542, in connect
proto = await self._create_connection(req, traces, timeout)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1206, in _create_direct_connection
raise last_exc
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1175, in _create_direct_connection
transp, proto = await self._wrap_create_connection(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 992, in _wrap_create_connection
raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]
如果不是带宽限制了你(但我无法检查),有一个解决方案比 celery 和 rabbitmq 更简单,但它的可扩展性不如 celery 和 rabbitmq,它将受到你的限制CPU.
的数量不是将调用拆分到 celery worker 上,而是将它们拆分到多个进程上。
我修改了 fetch
函数如下:
async def fetch(start, end):
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
# use start and end arguments here!
async for x in make_numbers(start, end):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in
tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
我修改了主要流程:
import concurrent.futures
from itertools import count
def one_executor(start, end):
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(fetch(start, end))
except:
print("error")
if __name__ == '__main__':
s = time.perf_counter()
# Change the value to the number of core you want to use.
max_worker = 4
length_by_executor = q // max_worker
with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor:
for index_min in count(0, length_by_executor):
# no matter with duplicated indexes due to the use of
# range in make_number function.
index_max = min(index_min + length_by_executor, q)
executor.submit(one_executor, index_min, index_max)
if index_max == q:
break
elapsed = time.perf_counter() - s
print(f"executed in {elapsed:0.2f} seconds.")
这是我得到的结果(q
的值设置为 10_000
):
1 worker: executed in 13.90 seconds.
2 workers: executed in 7.24 seconds.
3 workers: executed in 6.82 seconds.
我不在 tqdm
进度条上工作,使用当前的解决方案,将显示两个进度条(但我认为 tqdm 适用于多进程)。
瓶颈:同时连接数
首先,瓶颈是 TCP 连接器中的同时连接总数。
aiohttp.TCPConnector
的默认值为 limit=100
。在大多数系统上(在 macOS 上测试过),您应该能够通过将 connector
与 limit=200
:
# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:
花费的时间应该会显着减少。 (在 macOS 上:q = 20_000
从 58 秒减少 43% 到 33 秒,q = 10_000
从 31 秒减少 42% 到 18 秒。)
您可以配置的 limit
取决于您的机器可以打开的文件描述符的数量。 (在macOS上:可以运行ulimit -n
查看,ulimit -n 1024
增加当前终端会话为1024,然后改为limit=1000
。相比limit=100
,q = 20_000
减少 76% 至 14 秒,q = 10_000
减少 71% 至 9 秒。)
支持 5000 万个请求:异步生成器
其次,5000万个请求之所以出现挂起,仅仅是因为其数量庞大。
仅仅在 post_tasks
中创建 1000 万个协程需要 68-98 秒(在我的机器上差异很大),然后事件循环进一步负担那么多任务,其中 99.99% 被阻塞TCP 连接池。
我们可以使用异步生成器推迟协程的创建:
async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)
我们需要 asyncio.as_completed()
的对应物来处理 async_gen
和 concurrency
:
from asyncio import ensure_future, events
from asyncio.queues import Queue
def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +
def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +
async def _wait_for_one():
f = await done.get()
return f.result()
async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)
# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +
然后,我们更新fetch()
:
from functools import partial
CONCURRENCY = 200 # +
n = 0
q = 50_000_000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +
# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +
其他限制
通过以上,程序可以开始处理 5000 万个请求但是:
- 根据
tqdm
的估计,CONCURRENCY = 1000
仍需要 8 个小时左右。 - 您的程序可能 运行 内存不足
responses
并崩溃。
对于第 2 点,您可能应该这样做:
# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f
# Do something with response, such as writing to a local file
# ...
代码错误
do_get()
应该 return data
:
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +