在 python-trio 中将信号量和时间限制与 asks http 请求相结合
Combining semaphore and time limiting in python-trio with asks http request
我正在尝试以异步方式使用 Python 以加快我对服务器的请求。服务器响应时间较慢(通常是几秒,但有时快于一秒),但并行运行良好。我无权访问此服务器,也无法对其进行任何更改。所以,我有一个很大的 URL 列表(在下面的代码中,pages
),我事先知道,并且想通过一次发出 NO_TASKS=5
请求来加快它们的加载。另一方面,我不想让服务器超载,所以我希望每个请求之间的最小暂停时间为 1 秒(即每秒 1 个请求的限制)。
到目前为止,我已经使用 Trio 队列成功实现了信号量部分(一次五个请求)。
import asks
import time
import trio
NO_TASKS = 5
asks.init('trio')
asks_session = asks.Session()
queue = trio.Queue(NO_TASKS)
next_request_at = 0
results = []
pages = [
'https://www.yahoo.com/',
'http://www.cnn.com',
'http://www.python.org',
'http://www.jython.org',
'http://www.pypy.org',
'http://www.perl.org',
'http://www.cisco.com',
'http://www.facebook.com',
'http://www.twitter.com',
'http://www.macrumors.com/',
'http://arstechnica.com/',
'http://www.reuters.com/',
'http://abcnews.go.com/',
'http://www.cnbc.com/',
]
async def async_load_page(url):
global next_request_at
sleep = next_request_at
next_request_at = max(trio.current_time() + 1, next_request_at)
await trio.sleep_until(sleep)
next_request_at = max(trio.current_time() + 1, next_request_at)
print('start loading page {} at {} seconds'.format(url, trio.current_time()))
req = await asks_session.get(url)
results.append(req.text)
async def producer(url):
await queue.put(url)
async def consumer():
while True:
if queue.empty():
print('queue empty')
return
url = await queue.get()
await async_load_page(url)
async def main():
async with trio.open_nursery() as nursery:
for page in pages:
nursery.start_soon(producer, page)
await trio.sleep(0.2)
for _ in range(NO_TASKS):
nursery.start_soon(consumer)
start = time.time()
trio.run(main)
但是,我缺少限制部分的实现,即。 e.最大的实施。每秒 1 个请求。您可以在上面看到我这样做的尝试(async_load_page
的前五行),但是正如您在执行代码时看到的那样,这是行不通的:
start loading page http://www.reuters.com/ at 58097.12261669573 seconds
start loading page http://www.python.org at 58098.12367392373 seconds
start loading page http://www.pypy.org at 58098.12380622773 seconds
start loading page http://www.macrumors.com/ at 58098.12389389973 seconds
start loading page http://www.cisco.com at 58098.12397854373 seconds
start loading page http://arstechnica.com/ at 58098.12405119873 seconds
start loading page http://www.facebook.com at 58099.12458010273 seconds
start loading page http://www.twitter.com at 58099.37738939873 seconds
start loading page http://www.perl.org at 58100.37830828273 seconds
start loading page http://www.cnbc.com/ at 58100.91712723473 seconds
start loading page http://abcnews.go.com/ at 58101.91770178373 seconds
start loading page http://www.jython.org at 58102.91875295573 seconds
start loading page https://www.yahoo.com/ at 58103.91993155273 seconds
start loading page http://www.cnn.com at 58104.48031027673 seconds
queue empty
queue empty
queue empty
queue empty
queue empty
我花了一些时间寻找答案,但找不到任何答案。
每进入async_load_page
,您需要将next_request_at
加1。尝试使用 next_request_at = max(trio.current_time() + 1, next_request_at + 1)
。另外我认为你只需要设置一次。如果您将它设置为等待,您可能会遇到麻烦,因为您在等待其他任务时有机会在再次检查它之前更改它。
恕我直言,为此使用 trio.current_time()
太复杂了。
进行速率限制的最简单方法是速率限制器,即基本上执行此操作的单独任务:
async def ratelimit(queue,tick, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope() as scope:
task_status.started(scope)
while True:
await queue.put()
await trio.sleep(tick)
使用示例:
async with trio.open_nursery() as nursery:
q = trio.Queue(0) # can use >0 for burst modes
limiter = await nursery.start(ratelimit, q, 1)
while whatever:
await q.get(None) # will return at most once per second
do_whatever()
limiter.cancel()
换句话说,您从
开始该任务
q = trio.Queue(0)
limiter = await nursery.start(ratelimit, q, 1)
然后你可以确定最多调用一次
await q.put(None)
每秒将 return,因为零长度队列充当集合点。完成后,请致电
limiter.cancel()
停止限速任务,否则你的nursery不会退出。
如果您的用例包括您需要在限制器被取消之前完成的开始子任务,最简单的方法是在另一个托儿所中冲洗它们,即而不是
while whatever:
await q.put(None) # will return at most once per second
do_whatever()
limiter.cancel()
你会使用像
这样的东西
async with trio.open_nursery() as inner_nursery:
await start_tasks(inner_nursery, q)
limiter.cancel()
在触及限制器之前等待任务完成。
注意:您可以轻松地将其调整为“突发”模式,即在速率限制开始之前允许一定数量的请求,只需增加队列的长度即可。
实现您的目标的方法之一是使用工作人员在发送请求之前获取的互斥锁,并在一段时间后在单独的任务中释放:
async def fetch_urls(urls: Iterator, responses, n_workers, throttle):
# Using binary `trio.Semaphore` to be able
# to release it from a separate task.
mutex = trio.Semaphore(1)
async def tick():
await trio.sleep(throttle)
mutex.release()
async def worker():
for url in urls:
await mutex.acquire()
nursery.start_soon(tick)
response = await asks.get(url)
responses.append(response)
async with trio.open_nursery() as nursery:
for _ in range(n_workers):
nursery.start_soon(worker)
如果 worker
收到响应的时间早于 throttle
秒,它将在 await mutex.acquire()
上阻塞。否则 mutex
将被 tick
释放,另一个 worker
将能够获得它。
这类似于 leaky bucket 算法的工作原理:
- 等待
mutex
的工人如桶中的水
- 每个
tick
就像一个水桶,以恒定的速度漏水。
如果您在发送请求之前添加一些日志记录,您应该会得到类似这样的输出:
0.00169 started
0.001821 n_workers: 5
0.001833 throttle: 1
0.002152 fetching https://httpbin.org/delay/4
1.012 fetching https://httpbin.org/delay/2
2.014 fetching https://httpbin.org/delay/2
3.017 fetching https://httpbin.org/delay/3
4.02 fetching https://httpbin.org/delay/0
5.022 fetching https://httpbin.org/delay/2
6.024 fetching https://httpbin.org/delay/2
7.026 fetching https://httpbin.org/delay/3
8.029 fetching https://httpbin.org/delay/0
9.031 fetching https://httpbin.org/delay/0
10.61 finished
这个解决方案的动机和起源
自从我提出这个问题以来已经过去了几个月。 Python 从那以后有了进步,trio 也有了进步(以及我对他们的了解)。所以我认为是时候使用带有类型注释和 trio-0.10 内存通道的 Python 3.6 进行一点更新了。
我自己开发了原始版本的改进,但在阅读了@Roman Novatorov 的出色解决方案后,再次对其进行了修改,这是结果。感谢他提供了函数的主要结构(以及使用 httpbin.org 进行说明的想法)。我选择使用内存通道而不是互斥锁,以便能够从 worker 中取出任何令牌重新释放逻辑。
解法说明
我可以这样改写原来的问题:
- 我想有多个worker相互独立的发起请求(这样就实现为异步函数)
- 任何时候都释放了零个或一个令牌;任何向服务器发起请求的工作人员都会消耗一个令牌,并且在经过最短时间之前不会发出下一个令牌。在我的解决方案中,我使用 trio 的内存通道来协调令牌发行者和令牌消费者(工人)
如果您不熟悉内存通道及其语法,您可以在 trio doc 中阅读它们。我认为 async with memory_channel
和 memory_channel.clone()
的逻辑一开始可能会令人困惑。
from typing import List, Iterator
import asks
import trio
asks.init('trio')
links: List[str] = [
'https://httpbin.org/delay/7',
'https://httpbin.org/delay/6',
'https://httpbin.org/delay/4'
] * 3
async def fetch_urls(urls: List[str], number_workers: int, throttle_rate: float):
async def token_issuer(token_sender: trio.abc.SendChannel, number_tokens: int):
async with token_sender:
for _ in range(number_tokens):
await token_sender.send(None)
await trio.sleep(1 / throttle_rate)
async def worker(url_iterator: Iterator, token_receiver: trio.abc.ReceiveChannel):
async with token_receiver:
for url in url_iterator:
await token_receiver.receive()
print(f'[{round(trio.current_time(), 2)}] Start loading link: {url}')
response = await asks.get(url)
# print(f'[{round(trio.current_time(), 2)}] Loaded link: {url}')
responses.append(response)
responses = []
url_iterator = iter(urls)
token_send_channel, token_receive_channel = trio.open_memory_channel(0)
async with trio.open_nursery() as nursery:
async with token_receive_channel:
nursery.start_soon(token_issuer, token_send_channel.clone(), len(urls))
for _ in range(number_workers):
nursery.start_soon(worker, url_iterator, token_receive_channel.clone())
return responses
responses = trio.run(fetch_urls, links, 5, 1.)
日志输出示例:
如您所见,所有页面请求之间的最短时间为一秒:
[177878.99] Start loading link: https://httpbin.org/delay/7
[177879.99] Start loading link: https://httpbin.org/delay/6
[177880.99] Start loading link: https://httpbin.org/delay/4
[177881.99] Start loading link: https://httpbin.org/delay/7
[177882.99] Start loading link: https://httpbin.org/delay/6
[177886.20] Start loading link: https://httpbin.org/delay/4
[177887.20] Start loading link: https://httpbin.org/delay/7
[177888.20] Start loading link: https://httpbin.org/delay/6
[177889.44] Start loading link: https://httpbin.org/delay/4
对解决方案的评论
由于异步代码并非不常见,因此此解决方案不会保持请求的 url 的原始顺序。解决这个问题的一种方法是将一个 id 关联到原始 url,例如。 G。使用元组结构,将响应放入响应字典中,然后一个接一个地抓取响应以将它们放入响应列表中(保存排序并具有线性复杂度)。
我正在尝试以异步方式使用 Python 以加快我对服务器的请求。服务器响应时间较慢(通常是几秒,但有时快于一秒),但并行运行良好。我无权访问此服务器,也无法对其进行任何更改。所以,我有一个很大的 URL 列表(在下面的代码中,pages
),我事先知道,并且想通过一次发出 NO_TASKS=5
请求来加快它们的加载。另一方面,我不想让服务器超载,所以我希望每个请求之间的最小暂停时间为 1 秒(即每秒 1 个请求的限制)。
到目前为止,我已经使用 Trio 队列成功实现了信号量部分(一次五个请求)。
import asks
import time
import trio
NO_TASKS = 5
asks.init('trio')
asks_session = asks.Session()
queue = trio.Queue(NO_TASKS)
next_request_at = 0
results = []
pages = [
'https://www.yahoo.com/',
'http://www.cnn.com',
'http://www.python.org',
'http://www.jython.org',
'http://www.pypy.org',
'http://www.perl.org',
'http://www.cisco.com',
'http://www.facebook.com',
'http://www.twitter.com',
'http://www.macrumors.com/',
'http://arstechnica.com/',
'http://www.reuters.com/',
'http://abcnews.go.com/',
'http://www.cnbc.com/',
]
async def async_load_page(url):
global next_request_at
sleep = next_request_at
next_request_at = max(trio.current_time() + 1, next_request_at)
await trio.sleep_until(sleep)
next_request_at = max(trio.current_time() + 1, next_request_at)
print('start loading page {} at {} seconds'.format(url, trio.current_time()))
req = await asks_session.get(url)
results.append(req.text)
async def producer(url):
await queue.put(url)
async def consumer():
while True:
if queue.empty():
print('queue empty')
return
url = await queue.get()
await async_load_page(url)
async def main():
async with trio.open_nursery() as nursery:
for page in pages:
nursery.start_soon(producer, page)
await trio.sleep(0.2)
for _ in range(NO_TASKS):
nursery.start_soon(consumer)
start = time.time()
trio.run(main)
但是,我缺少限制部分的实现,即。 e.最大的实施。每秒 1 个请求。您可以在上面看到我这样做的尝试(async_load_page
的前五行),但是正如您在执行代码时看到的那样,这是行不通的:
start loading page http://www.reuters.com/ at 58097.12261669573 seconds
start loading page http://www.python.org at 58098.12367392373 seconds
start loading page http://www.pypy.org at 58098.12380622773 seconds
start loading page http://www.macrumors.com/ at 58098.12389389973 seconds
start loading page http://www.cisco.com at 58098.12397854373 seconds
start loading page http://arstechnica.com/ at 58098.12405119873 seconds
start loading page http://www.facebook.com at 58099.12458010273 seconds
start loading page http://www.twitter.com at 58099.37738939873 seconds
start loading page http://www.perl.org at 58100.37830828273 seconds
start loading page http://www.cnbc.com/ at 58100.91712723473 seconds
start loading page http://abcnews.go.com/ at 58101.91770178373 seconds
start loading page http://www.jython.org at 58102.91875295573 seconds
start loading page https://www.yahoo.com/ at 58103.91993155273 seconds
start loading page http://www.cnn.com at 58104.48031027673 seconds
queue empty
queue empty
queue empty
queue empty
queue empty
我花了一些时间寻找答案,但找不到任何答案。
每进入async_load_page
,您需要将next_request_at
加1。尝试使用 next_request_at = max(trio.current_time() + 1, next_request_at + 1)
。另外我认为你只需要设置一次。如果您将它设置为等待,您可能会遇到麻烦,因为您在等待其他任务时有机会在再次检查它之前更改它。
恕我直言,为此使用 trio.current_time()
太复杂了。
进行速率限制的最简单方法是速率限制器,即基本上执行此操作的单独任务:
async def ratelimit(queue,tick, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope() as scope:
task_status.started(scope)
while True:
await queue.put()
await trio.sleep(tick)
使用示例:
async with trio.open_nursery() as nursery:
q = trio.Queue(0) # can use >0 for burst modes
limiter = await nursery.start(ratelimit, q, 1)
while whatever:
await q.get(None) # will return at most once per second
do_whatever()
limiter.cancel()
换句话说,您从
开始该任务q = trio.Queue(0)
limiter = await nursery.start(ratelimit, q, 1)
然后你可以确定最多调用一次
await q.put(None)
每秒将 return,因为零长度队列充当集合点。完成后,请致电
limiter.cancel()
停止限速任务,否则你的nursery不会退出。
如果您的用例包括您需要在限制器被取消之前完成的开始子任务,最简单的方法是在另一个托儿所中冲洗它们,即而不是
while whatever:
await q.put(None) # will return at most once per second
do_whatever()
limiter.cancel()
你会使用像
这样的东西async with trio.open_nursery() as inner_nursery:
await start_tasks(inner_nursery, q)
limiter.cancel()
在触及限制器之前等待任务完成。
注意:您可以轻松地将其调整为“突发”模式,即在速率限制开始之前允许一定数量的请求,只需增加队列的长度即可。
实现您的目标的方法之一是使用工作人员在发送请求之前获取的互斥锁,并在一段时间后在单独的任务中释放:
async def fetch_urls(urls: Iterator, responses, n_workers, throttle):
# Using binary `trio.Semaphore` to be able
# to release it from a separate task.
mutex = trio.Semaphore(1)
async def tick():
await trio.sleep(throttle)
mutex.release()
async def worker():
for url in urls:
await mutex.acquire()
nursery.start_soon(tick)
response = await asks.get(url)
responses.append(response)
async with trio.open_nursery() as nursery:
for _ in range(n_workers):
nursery.start_soon(worker)
如果 worker
收到响应的时间早于 throttle
秒,它将在 await mutex.acquire()
上阻塞。否则 mutex
将被 tick
释放,另一个 worker
将能够获得它。
这类似于 leaky bucket 算法的工作原理:
- 等待
mutex
的工人如桶中的水 - 每个
tick
就像一个水桶,以恒定的速度漏水。
如果您在发送请求之前添加一些日志记录,您应该会得到类似这样的输出:
0.00169 started
0.001821 n_workers: 5
0.001833 throttle: 1
0.002152 fetching https://httpbin.org/delay/4
1.012 fetching https://httpbin.org/delay/2
2.014 fetching https://httpbin.org/delay/2
3.017 fetching https://httpbin.org/delay/3
4.02 fetching https://httpbin.org/delay/0
5.022 fetching https://httpbin.org/delay/2
6.024 fetching https://httpbin.org/delay/2
7.026 fetching https://httpbin.org/delay/3
8.029 fetching https://httpbin.org/delay/0
9.031 fetching https://httpbin.org/delay/0
10.61 finished
这个解决方案的动机和起源
自从我提出这个问题以来已经过去了几个月。 Python 从那以后有了进步,trio 也有了进步(以及我对他们的了解)。所以我认为是时候使用带有类型注释和 trio-0.10 内存通道的 Python 3.6 进行一点更新了。
我自己开发了原始版本的改进,但在阅读了@Roman Novatorov 的出色解决方案后,再次对其进行了修改,这是结果。感谢他提供了函数的主要结构(以及使用 httpbin.org 进行说明的想法)。我选择使用内存通道而不是互斥锁,以便能够从 worker 中取出任何令牌重新释放逻辑。
解法说明
我可以这样改写原来的问题:
- 我想有多个worker相互独立的发起请求(这样就实现为异步函数)
- 任何时候都释放了零个或一个令牌;任何向服务器发起请求的工作人员都会消耗一个令牌,并且在经过最短时间之前不会发出下一个令牌。在我的解决方案中,我使用 trio 的内存通道来协调令牌发行者和令牌消费者(工人)
如果您不熟悉内存通道及其语法,您可以在 trio doc 中阅读它们。我认为 async with memory_channel
和 memory_channel.clone()
的逻辑一开始可能会令人困惑。
from typing import List, Iterator
import asks
import trio
asks.init('trio')
links: List[str] = [
'https://httpbin.org/delay/7',
'https://httpbin.org/delay/6',
'https://httpbin.org/delay/4'
] * 3
async def fetch_urls(urls: List[str], number_workers: int, throttle_rate: float):
async def token_issuer(token_sender: trio.abc.SendChannel, number_tokens: int):
async with token_sender:
for _ in range(number_tokens):
await token_sender.send(None)
await trio.sleep(1 / throttle_rate)
async def worker(url_iterator: Iterator, token_receiver: trio.abc.ReceiveChannel):
async with token_receiver:
for url in url_iterator:
await token_receiver.receive()
print(f'[{round(trio.current_time(), 2)}] Start loading link: {url}')
response = await asks.get(url)
# print(f'[{round(trio.current_time(), 2)}] Loaded link: {url}')
responses.append(response)
responses = []
url_iterator = iter(urls)
token_send_channel, token_receive_channel = trio.open_memory_channel(0)
async with trio.open_nursery() as nursery:
async with token_receive_channel:
nursery.start_soon(token_issuer, token_send_channel.clone(), len(urls))
for _ in range(number_workers):
nursery.start_soon(worker, url_iterator, token_receive_channel.clone())
return responses
responses = trio.run(fetch_urls, links, 5, 1.)
日志输出示例:
如您所见,所有页面请求之间的最短时间为一秒:
[177878.99] Start loading link: https://httpbin.org/delay/7
[177879.99] Start loading link: https://httpbin.org/delay/6
[177880.99] Start loading link: https://httpbin.org/delay/4
[177881.99] Start loading link: https://httpbin.org/delay/7
[177882.99] Start loading link: https://httpbin.org/delay/6
[177886.20] Start loading link: https://httpbin.org/delay/4
[177887.20] Start loading link: https://httpbin.org/delay/7
[177888.20] Start loading link: https://httpbin.org/delay/6
[177889.44] Start loading link: https://httpbin.org/delay/4
对解决方案的评论
由于异步代码并非不常见,因此此解决方案不会保持请求的 url 的原始顺序。解决这个问题的一种方法是将一个 id 关联到原始 url,例如。 G。使用元组结构,将响应放入响应字典中,然后一个接一个地抓取响应以将它们放入响应列表中(保存排序并具有线性复杂度)。