Python Aiohttp Asyncio:如何在每个任务之间创建延迟
Python Aiohttp Asyncio: how to create delays between each task
我要解决的问题:
我正在向服务器发出许多 api 请求。我正在尝试在异步 api 调用之间创建延迟以遵守服务器的速率限制策略。
我想让它做什么
我希望它表现得像这样:
- 发出 api 请求 #1
- 等待 0.1 秒
- 发出 api 请求 #2
- 等待0.1秒
...等等...
- 重复直到完成所有请求
- 将响应和return结果收集到一个对象中(结果)
问题:
当我在代码中引入 asyncio.sleep() 或 time.sleep() 时,它仍然使 api 几乎是瞬间请求。它似乎延迟了 print() 的执行,但没有延迟 api 请求。我怀疑我必须在 loop 中创建延迟,而不是在 fetch_one() 或 fetch_all() 中创建延迟,但无法弄清楚如何这样做。
代码块:
async def fetch_all(loop, urls, delay):
results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay):
#time.sleep(delay)
#asyncio.sleep(delay)
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
Versions I'm using:
python 3.8.5
aiohttp 3.7.4
asyncio 3.4.3
如果有任何指导我正确方向的提示,我将不胜感激!
当您使用 asyncio.gather
时,您会同时 运行 所有 fetch_one
协程。他们所有人一起等待 delay
,而不是一起即时调用 API。
要解决这个问题,您应该在 fetch_all
中一个接一个地等待 fetch_one
或使用 Semaphore 来指示下一个不应该在上一个完成之前开始。
想法如下:
import asyncio
_sem = asyncio.Semaphore(1)
async def fetch_all(loop, urls, delay):
results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay):
async with _sem: # next coroutine(s) will stuck here until the previous is done
await asyncio.sleep(delay)
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
对 asyncio.gather
的调用将“同时”启动所有请求 - 另一方面,如果您只是对每个任务使用锁或等待,您将不会从使用并行性中获得任何好处.
如果您知道您可以发出请求的速率,那么最简单的做法就是在每个连续请求之前增加异步暂停 - 一个简单的全局变量可以做到这一点:
next_delay = 0.1
async def fetch_all(loop, urls, delay):
results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay):
global next_delay
next_delay += delay
await asyncio.sleep(next_delay)
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
现在,如果你想发出 5 个请求,然后发出下 5 个请求,你可以使用像 asyncio.Condition
这样的同步原语,在表达式上使用它的 wait_for
检查有多少 api 呼叫处于活动状态:
active_calls = 0
MAX_CALLS = 5
async def fetch_all(loop, urls, delay):
event = asyncio.Event()
event.set()
results = await asyncio.gather(*[fetch_one(loop, url, delay, event) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay, cond):
global active_calls
active_calls += 1
if active_calls > MAX_CALLS:
event.clear()
await event.wait()
try:
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
finally:
active_calls -= 1
if active_calls == 0:
event.set()
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
对于这两个示例,如果您的任务在设计中避免使用全局变量(实际上,这些是“模块”变量)- 您可以将所有功能移动到 class,然后在一个实例上工作,并且将全局变量提升为实例属性,或使用可变容器,例如在其第一项中保存 active_calls
值的列表,并将其作为参数传递。
我要解决的问题: 我正在向服务器发出许多 api 请求。我正在尝试在异步 api 调用之间创建延迟以遵守服务器的速率限制策略。
我想让它做什么 我希望它表现得像这样:
- 发出 api 请求 #1
- 等待 0.1 秒
- 发出 api 请求 #2
- 等待0.1秒 ...等等...
- 重复直到完成所有请求
- 将响应和return结果收集到一个对象中(结果)
问题: 当我在代码中引入 asyncio.sleep() 或 time.sleep() 时,它仍然使 api 几乎是瞬间请求。它似乎延迟了 print() 的执行,但没有延迟 api 请求。我怀疑我必须在 loop 中创建延迟,而不是在 fetch_one() 或 fetch_all() 中创建延迟,但无法弄清楚如何这样做。
代码块:
async def fetch_all(loop, urls, delay):
results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay):
#time.sleep(delay)
#asyncio.sleep(delay)
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
Versions I'm using:
python 3.8.5
aiohttp 3.7.4
asyncio 3.4.3
如果有任何指导我正确方向的提示,我将不胜感激!
当您使用 asyncio.gather
时,您会同时 运行 所有 fetch_one
协程。他们所有人一起等待 delay
,而不是一起即时调用 API。
要解决这个问题,您应该在 fetch_all
中一个接一个地等待 fetch_one
或使用 Semaphore 来指示下一个不应该在上一个完成之前开始。
想法如下:
import asyncio
_sem = asyncio.Semaphore(1)
async def fetch_all(loop, urls, delay):
results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay):
async with _sem: # next coroutine(s) will stuck here until the previous is done
await asyncio.sleep(delay)
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
对 asyncio.gather
的调用将“同时”启动所有请求 - 另一方面,如果您只是对每个任务使用锁或等待,您将不会从使用并行性中获得任何好处.
如果您知道您可以发出请求的速率,那么最简单的做法就是在每个连续请求之前增加异步暂停 - 一个简单的全局变量可以做到这一点:
next_delay = 0.1
async def fetch_all(loop, urls, delay):
results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay):
global next_delay
next_delay += delay
await asyncio.sleep(next_delay)
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
现在,如果你想发出 5 个请求,然后发出下 5 个请求,你可以使用像 asyncio.Condition
这样的同步原语,在表达式上使用它的 wait_for
检查有多少 api 呼叫处于活动状态:
active_calls = 0
MAX_CALLS = 5
async def fetch_all(loop, urls, delay):
event = asyncio.Event()
event.set()
results = await asyncio.gather(*[fetch_one(loop, url, delay, event) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay, cond):
global active_calls
active_calls += 1
if active_calls > MAX_CALLS:
event.clear()
await event.wait()
try:
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
finally:
active_calls -= 1
if active_calls == 0:
event.set()
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
对于这两个示例,如果您的任务在设计中避免使用全局变量(实际上,这些是“模块”变量)- 您可以将所有功能移动到 class,然后在一个实例上工作,并且将全局变量提升为实例属性,或使用可变容器,例如在其第一项中保存 active_calls
值的列表,并将其作为参数传递。