使用 asyncio 的恰好 100 个请求后,并行请求无限阻塞
Parallel requests block infinitely after exactly 100 requests using asyncio
我试过同时使用 httpx 和 aiohttp,两者都有这个硬编码限制。
import asyncio
import aiohttp
import httpx
async def main():
client = aiohttp.ClientSession()
# client = httpx.AsyncClient(timeout=None)
coros = [
client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
)
for _ in range(500)
]
for i, coro in enumerate(asyncio.as_completed(coros)):
await coro
print(i, end=", ")
asyncio.run(main())
输出-
0、1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20、21、22 , 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47 , 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72 , 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97 , 98, 99
两个库都停留在 99
如果每个请求都使用新会话,则不会发生这种情况。
我做错了什么? asyncio 的全部意义不就是让事情变得如此简单吗?
我尝试用线程、zmq 和请求重写它,效果很好 -
import zmq
N_WORKERS = 100
N_ITERS = 500
ctx = zmq.Context.instance()
def worker():
client = requests.Session()
pull = ctx.socket(zmq.PULL)
pull.connect("inproc://#1")
push = ctx.socket(zmq.PUSH)
push.connect("inproc://#2")
while True:
if not pull.recv_pyobj():
return
r = client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
)
push.send_pyobj(r.content)
def ventilator():
push = ctx.socket(zmq.PUSH)
push.bind("inproc://#1")
# distribute tasks to all workers
for _ in range(N_ITERS):
push.send_pyobj(True)
# close down workers
for _ in range(N_WORKERS):
push.send_pyobj(False)
# start workers & ventilator
threads = [Thread(target=worker) for _ in range(N_WORKERS)]
threads.append(Thread(target=ventilator))
for t in threads:
t.start()
# pull results from workers
pull = ctx.socket(zmq.PULL)
pull.bind("inproc://#2")
for i in range(N_ITERS):
pull.recv_pyobj()
print(i, end=", ")
# wait for workers to exit
for t in threads:
t.join()
问题是您 client.get(...)
returns 一个带有 OS-level 套接字实时句柄的请求对象。未能关闭该对象会导致 aiohttp 运行 套接字不足,即达到连接器限制,即 100 by default.
要解决此问题,您需要关闭 client.get()
返回的对象,或使用 async with
以确保在 with
块完成后立即关闭对象.例如:
async def get(client):
async with client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",}) as resp:
pass
async def main():
async with aiohttp.ClientSession() as client:
coros = [get(client) for _ in range(500)]
for i, coro in enumerate(asyncio.as_completed(coros)):
await coro
print(i, end=", ", flush=True)
asyncio.run(main())
此外,aiohttp.ClientSession
对象也应该关闭,也可以使用async with
来完成,如上所示。
我试过同时使用 httpx 和 aiohttp,两者都有这个硬编码限制。
import asyncio
import aiohttp
import httpx
async def main():
client = aiohttp.ClientSession()
# client = httpx.AsyncClient(timeout=None)
coros = [
client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
)
for _ in range(500)
]
for i, coro in enumerate(asyncio.as_completed(coros)):
await coro
print(i, end=", ")
asyncio.run(main())
输出-
0、1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20、21、22 , 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47 , 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72 , 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97 , 98, 99
两个库都停留在 99
如果每个请求都使用新会话,则不会发生这种情况。
我做错了什么? asyncio 的全部意义不就是让事情变得如此简单吗?
我尝试用线程、zmq 和请求重写它,效果很好 -
import zmq
N_WORKERS = 100
N_ITERS = 500
ctx = zmq.Context.instance()
def worker():
client = requests.Session()
pull = ctx.socket(zmq.PULL)
pull.connect("inproc://#1")
push = ctx.socket(zmq.PUSH)
push.connect("inproc://#2")
while True:
if not pull.recv_pyobj():
return
r = client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
)
push.send_pyobj(r.content)
def ventilator():
push = ctx.socket(zmq.PUSH)
push.bind("inproc://#1")
# distribute tasks to all workers
for _ in range(N_ITERS):
push.send_pyobj(True)
# close down workers
for _ in range(N_WORKERS):
push.send_pyobj(False)
# start workers & ventilator
threads = [Thread(target=worker) for _ in range(N_WORKERS)]
threads.append(Thread(target=ventilator))
for t in threads:
t.start()
# pull results from workers
pull = ctx.socket(zmq.PULL)
pull.bind("inproc://#2")
for i in range(N_ITERS):
pull.recv_pyobj()
print(i, end=", ")
# wait for workers to exit
for t in threads:
t.join()
问题是您 client.get(...)
returns 一个带有 OS-level 套接字实时句柄的请求对象。未能关闭该对象会导致 aiohttp 运行 套接字不足,即达到连接器限制,即 100 by default.
要解决此问题,您需要关闭 client.get()
返回的对象,或使用 async with
以确保在 with
块完成后立即关闭对象.例如:
async def get(client):
async with client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",}) as resp:
pass
async def main():
async with aiohttp.ClientSession() as client:
coros = [get(client) for _ in range(500)]
for i, coro in enumerate(asyncio.as_completed(coros)):
await coro
print(i, end=", ", flush=True)
asyncio.run(main())
此外,aiohttp.ClientSession
对象也应该关闭,也可以使用async with
来完成,如上所示。