基于令牌桶算法的异步信号量
Token Bucket Algorithm based async semaphore
我想实现一个异步 token bucket algorithm 来管理何时 运行 任务(协程)。简而言之,这将在任何给定的时间跨度内将协程的数量控制在 运行。
我试过这个 and semaphores 但两者都不是真正的令牌桶算法。目标是在有能力的情况下耗尽存储桶(即 运行 协同例程),否则在 运行 执行下一个例程之前等待足够长的时间。
所以我决定做的是覆盖 semaphore class 来控制我 运行 在任何时间段内有多少协同例程,如下所示:
TokenSemaphore.py
import datetime
from asyncio.locks import Semaphore
import asyncio
import collections
from math import floor
class TokenSemaphore(Semaphore):
"""A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each
acquire() call and incremented by each release() call. The counter
can never go below zero; when acquire() finds that it is zero, it blocks,
waiting until some other thread calls release().
Semaphores also support the context management protocol.
The optional argument gives the initial value for the internal
counter; it defaults to 1. If the value given is less than 0,
ValueError is raised.
"""
def __init__(self, capacity=1, rate=1, loop=None):
if capacity < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._capacity = capacity
self._rate = rate
self._time_table = collections.deque(maxlen=self._capacity)
super().__init__(value=capacity, loop=loop)
@property
def capacity(self):
return self._capacity
def _wake_up_next(self):
while self._waiters:
waiter = self._waiters.popleft()
if not waiter.done():
waiter.set_result(None)
return
def has_capacity(self):
if len(self._time_table) < self._capacity:
self._time_table.append(datetime.datetime.now())
return True
tf = datetime.datetime.now()
delta = (tf - self._time_table[0]).total_seconds()
if delta < self._rate:
return False
else:
self._time_table.append(tf)
return True
def locked(self):
"""Returns True if semaphore can not be acquired immediately."""
return self._capacity == 0
async def acquire(self):
"""Acquire a semaphore.
If the internal counter is larger than zero on entry,
decrement it by one and return True immediately. If it is
zero on entry, block, waiting until some other coroutine has
called release() to make it larger than 0, and then return
True.
"""
while not self.has_capacity():
fut = self._loop.create_future()
self._waiters.append(fut)
try:
await fut
except:
# See the similar code in Queue.get.
fut.cancel()
if self._capacity > 0 and not fut.cancelled():
self._wake_up_next()
raise
self._capacity -= 1
return True
async def release(self):
"""Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to
become larger than zero again, wake up that coroutine.
"""
tf = datetime.datetime.now()
delta = (tf - self._time_table[-1]).total_seconds()
result = self._rate * floor(delta)
sleep_time = 1.0/float(self._rate) - result if result < 1.0/float(self._rate) else 0
await asyncio.sleep(sleep_time)
tf = datetime.datetime.now()
delta = (tf - self._time_table[-1]).total_seconds()
self._capacity += result
self._wake_up_next()
注意我的 release()
是 async def
因为我想如果我的桶里没有足够的令牌我需要睡在这里。信号量的释放不是async def
。我觉得这就是我搞砸的地方,但我不确定。
为了测试我的实现,我写了这个:
run.py
import asyncio
import aiohttp
import re
import datetime
from TokenSemaphore import TokenSemaphore
SITE = "https://example.com"
async def myWorker(semaphore):
await semaphore.acquire()
print("Successfully acquired the semaphore")
async with aiohttp.ClientSession() as session:
async with session.get(SITE, verify_ssl=False) as resp:
print(resp.status, datetime.datetime.now() - ref, semaphore.capacity)
print("Releasing Semaphore")
await semaphore.release()
async def main(loop):
mySemaphore = TokenSemaphore(capacity=40, rate=2)
# mySemaphore = asyncio.Semaphore(value=40)
tasks = [myWorker(mySemaphore) for _ in range(44)]
await asyncio.wait(tasks)
print("Main Coroutine")
ref = datetime.datetime.now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
print("All Workers Completed")
loop.close()
问题
所以 TokenSemaphore 似乎可以工作,但如果它们是容量的话,它不会耗尽桶。我的打印语句显示了桶的可用容量,它表明它有足够的容量(即 运行 更多任务的可用性)。我无法理解为什么我的令牌信号量没有 运行 在它们有足够的能力时设置更多的协同例程。
$ python run.py
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
200 0:00:00.177742 20
Releasing Semaphore
200 0:00:00.178944 20
Releasing Semaphore
200 0:00:00.184608 20
Releasing Semaphore
200 0:00:01.103417 20
Releasing Semaphore
200 0:00:01.105539 22
Releasing Semaphore
200 0:00:01.106280 22
Releasing Semaphore
200 0:00:01.106929 22
Releasing Semaphore
200 0:00:01.107701 22
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:01.110719 29
Releasing Semaphore
200 0:00:01.111228 29
Releasing Semaphore
200 0:00:01.111801 29
Releasing Semaphore
200 0:00:01.112366 29
Releasing Semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
200 0:00:01.116581 25
Releasing Semaphore
200 0:00:01.153321 25
Releasing Semaphore
200 0:00:01.155235 25
Releasing Semaphore
200 0:00:01.155791 25
Releasing Semaphore
200 0:00:01.156530 25
Releasing Semaphore
200 0:00:01.157258 25
Releasing Semaphore
200 0:00:01.221712 25
Releasing Semaphore
200 0:00:01.223267 25
Releasing Semaphore
200 0:00:01.223724 25
Releasing Semaphore
200 0:00:01.224246 25
Releasing Semaphore
200 0:00:01.224745 25
Releasing Semaphore
200 0:00:01.228829 25
Releasing Semaphore
200 0:00:04.326125 25
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:04.361430 30
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:04.910990 29
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:05.440614 28
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:05.974999 27
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:06.516174 26
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:07.051482 25
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:07.601656 24
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:08.147306 23
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:08.682823 22
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:09.216370 21
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:09.752510 20
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:10.302981 19
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:10.843989 18
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:11.384492 17
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:11.939925 16
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:12.485116 15
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:13.016098 14
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:13.554884 13
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:14.096828 12
Releasing Semaphore
Main Coroutine
All Workers Completed
3 个问题:
- 我们希望
_time_table
达到大小 _capacity
,但它在 acquire
中递减。最好将更改从 has_capacity
. 移至 _time_table
- 在
release
中,result
评估为 0,因此协程唤醒后容量不会增加。只需将容量增加 1.
- 一般来说,您可能希望在
acquire
而不是 release
睡觉,这样您就不会在执行结束时无缘无故地等待。
看看这个,看看是否有帮助:
class TokenSemaphore(Semaphore):
"""A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each
acquire() call and incremented by each release() call. The counter
can never go below zero; when acquire() finds that it is zero, it blocks,
waiting until some other thread calls release().
Semaphores also support the context management protocol.
The optional argument gives the initial value for the internal
counter; it defaults to 1. If the value given is less than 0,
ValueError is raised.
"""
def __init__(self, capacity=1, rate=1, loop=None):
if capacity < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._capacity = capacity
# Tracks of coroutines waiting on acquire.
self._waiting = 0
self._rate = rate
self._time_table = collections.deque(maxlen=self._capacity)
# Time of last token that was issued.
self._last_token = None
super().__init__(value=capacity, loop=loop)
@property
def capacity(self):
return max(self._capacity - self._waiting, 0)
def locked(self):
"""Returns True if semaphore can not be acquired immediately."""
return self.capacity == 0
def _get_sleep_time(self):
now = datetime.datetime.now()
token_freq = datetime.timedelta(seconds=(1.0/float(self._rate)))
if self._last_token is None:
delta = now - self._time_table[-1]
sleep_time = token_freq - delta
self._last_token = now + sleep_time
return sleep_time.total_seconds()
elif self._last_token < now:
self._last_token += token_freq
return 0
else:
self._last_token += token_freq
return (self._last_token - now).total_seconds()
async def acquire(self):
"""Acquire a semaphore.
If the internal counter is larger than zero on entry,
decrement it by one and return True immediately. If it is
zero on entry, block, waiting until some other coroutine has
called release() to make it larger than 0, and then return
True.
"""
print(self._capacity)
if self.locked():
self._waiting += 1
fut = self._loop.create_future()
self._waiters.append(fut)
sleep_time = self._get_sleep_time()
# Schedule the execution.
await asyncio.sleep(sleep_time)
try:
# Wait for the corresponding task that's already executing to
# finish.
await fut
except:
# See the similar code in Queue.get.
fut.cancel()
if self._capacity > 0 and not fut.cancelled():
self._wake_up_next()
raise
finally:
self._waiting -= 1
else:
self._last_token = None
self._capacity -= 1
self._time_table.append(datetime.datetime.now())
return True
def _wake_up_next(self):
while self._waiters:
waiter = self._waiters.popleft()
if not waiter.done():
waiter.set_result(None)
return
async def release(self):
"""Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to
become larger than zero again, wake up that coroutine.
"""
self._capacity += 1
self._wake_up_next()
我想实现一个异步 token bucket algorithm 来管理何时 运行 任务(协程)。简而言之,这将在任何给定的时间跨度内将协程的数量控制在 运行。
我试过这个
所以我决定做的是覆盖 semaphore class 来控制我 运行 在任何时间段内有多少协同例程,如下所示:
TokenSemaphore.py
import datetime
from asyncio.locks import Semaphore
import asyncio
import collections
from math import floor
class TokenSemaphore(Semaphore):
"""A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each
acquire() call and incremented by each release() call. The counter
can never go below zero; when acquire() finds that it is zero, it blocks,
waiting until some other thread calls release().
Semaphores also support the context management protocol.
The optional argument gives the initial value for the internal
counter; it defaults to 1. If the value given is less than 0,
ValueError is raised.
"""
def __init__(self, capacity=1, rate=1, loop=None):
if capacity < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._capacity = capacity
self._rate = rate
self._time_table = collections.deque(maxlen=self._capacity)
super().__init__(value=capacity, loop=loop)
@property
def capacity(self):
return self._capacity
def _wake_up_next(self):
while self._waiters:
waiter = self._waiters.popleft()
if not waiter.done():
waiter.set_result(None)
return
def has_capacity(self):
if len(self._time_table) < self._capacity:
self._time_table.append(datetime.datetime.now())
return True
tf = datetime.datetime.now()
delta = (tf - self._time_table[0]).total_seconds()
if delta < self._rate:
return False
else:
self._time_table.append(tf)
return True
def locked(self):
"""Returns True if semaphore can not be acquired immediately."""
return self._capacity == 0
async def acquire(self):
"""Acquire a semaphore.
If the internal counter is larger than zero on entry,
decrement it by one and return True immediately. If it is
zero on entry, block, waiting until some other coroutine has
called release() to make it larger than 0, and then return
True.
"""
while not self.has_capacity():
fut = self._loop.create_future()
self._waiters.append(fut)
try:
await fut
except:
# See the similar code in Queue.get.
fut.cancel()
if self._capacity > 0 and not fut.cancelled():
self._wake_up_next()
raise
self._capacity -= 1
return True
async def release(self):
"""Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to
become larger than zero again, wake up that coroutine.
"""
tf = datetime.datetime.now()
delta = (tf - self._time_table[-1]).total_seconds()
result = self._rate * floor(delta)
sleep_time = 1.0/float(self._rate) - result if result < 1.0/float(self._rate) else 0
await asyncio.sleep(sleep_time)
tf = datetime.datetime.now()
delta = (tf - self._time_table[-1]).total_seconds()
self._capacity += result
self._wake_up_next()
注意我的 release()
是 async def
因为我想如果我的桶里没有足够的令牌我需要睡在这里。信号量的释放不是async def
。我觉得这就是我搞砸的地方,但我不确定。
为了测试我的实现,我写了这个:
run.py
import asyncio
import aiohttp
import re
import datetime
from TokenSemaphore import TokenSemaphore
SITE = "https://example.com"
async def myWorker(semaphore):
await semaphore.acquire()
print("Successfully acquired the semaphore")
async with aiohttp.ClientSession() as session:
async with session.get(SITE, verify_ssl=False) as resp:
print(resp.status, datetime.datetime.now() - ref, semaphore.capacity)
print("Releasing Semaphore")
await semaphore.release()
async def main(loop):
mySemaphore = TokenSemaphore(capacity=40, rate=2)
# mySemaphore = asyncio.Semaphore(value=40)
tasks = [myWorker(mySemaphore) for _ in range(44)]
await asyncio.wait(tasks)
print("Main Coroutine")
ref = datetime.datetime.now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
print("All Workers Completed")
loop.close()
问题
所以 TokenSemaphore 似乎可以工作,但如果它们是容量的话,它不会耗尽桶。我的打印语句显示了桶的可用容量,它表明它有足够的容量(即 运行 更多任务的可用性)。我无法理解为什么我的令牌信号量没有 运行 在它们有足够的能力时设置更多的协同例程。
$ python run.py
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
200 0:00:00.177742 20
Releasing Semaphore
200 0:00:00.178944 20
Releasing Semaphore
200 0:00:00.184608 20
Releasing Semaphore
200 0:00:01.103417 20
Releasing Semaphore
200 0:00:01.105539 22
Releasing Semaphore
200 0:00:01.106280 22
Releasing Semaphore
200 0:00:01.106929 22
Releasing Semaphore
200 0:00:01.107701 22
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:01.110719 29
Releasing Semaphore
200 0:00:01.111228 29
Releasing Semaphore
200 0:00:01.111801 29
Releasing Semaphore
200 0:00:01.112366 29
Releasing Semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
Successfully acquired the semaphore
200 0:00:01.116581 25
Releasing Semaphore
200 0:00:01.153321 25
Releasing Semaphore
200 0:00:01.155235 25
Releasing Semaphore
200 0:00:01.155791 25
Releasing Semaphore
200 0:00:01.156530 25
Releasing Semaphore
200 0:00:01.157258 25
Releasing Semaphore
200 0:00:01.221712 25
Releasing Semaphore
200 0:00:01.223267 25
Releasing Semaphore
200 0:00:01.223724 25
Releasing Semaphore
200 0:00:01.224246 25
Releasing Semaphore
200 0:00:01.224745 25
Releasing Semaphore
200 0:00:01.228829 25
Releasing Semaphore
200 0:00:04.326125 25
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:04.361430 30
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:04.910990 29
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:05.440614 28
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:05.974999 27
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:06.516174 26
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:07.051482 25
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:07.601656 24
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:08.147306 23
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:08.682823 22
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:09.216370 21
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:09.752510 20
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:10.302981 19
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:10.843989 18
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:11.384492 17
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:11.939925 16
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:12.485116 15
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:13.016098 14
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:13.554884 13
Releasing Semaphore
Successfully acquired the semaphore
200 0:00:14.096828 12
Releasing Semaphore
Main Coroutine
All Workers Completed
3 个问题:
- 我们希望
_time_table
达到大小_capacity
,但它在acquire
中递减。最好将更改从has_capacity
. 移至 - 在
release
中,result
评估为 0,因此协程唤醒后容量不会增加。只需将容量增加 1. - 一般来说,您可能希望在
acquire
而不是release
睡觉,这样您就不会在执行结束时无缘无故地等待。
_time_table
看看这个,看看是否有帮助:
class TokenSemaphore(Semaphore):
"""A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each
acquire() call and incremented by each release() call. The counter
can never go below zero; when acquire() finds that it is zero, it blocks,
waiting until some other thread calls release().
Semaphores also support the context management protocol.
The optional argument gives the initial value for the internal
counter; it defaults to 1. If the value given is less than 0,
ValueError is raised.
"""
def __init__(self, capacity=1, rate=1, loop=None):
if capacity < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._capacity = capacity
# Tracks of coroutines waiting on acquire.
self._waiting = 0
self._rate = rate
self._time_table = collections.deque(maxlen=self._capacity)
# Time of last token that was issued.
self._last_token = None
super().__init__(value=capacity, loop=loop)
@property
def capacity(self):
return max(self._capacity - self._waiting, 0)
def locked(self):
"""Returns True if semaphore can not be acquired immediately."""
return self.capacity == 0
def _get_sleep_time(self):
now = datetime.datetime.now()
token_freq = datetime.timedelta(seconds=(1.0/float(self._rate)))
if self._last_token is None:
delta = now - self._time_table[-1]
sleep_time = token_freq - delta
self._last_token = now + sleep_time
return sleep_time.total_seconds()
elif self._last_token < now:
self._last_token += token_freq
return 0
else:
self._last_token += token_freq
return (self._last_token - now).total_seconds()
async def acquire(self):
"""Acquire a semaphore.
If the internal counter is larger than zero on entry,
decrement it by one and return True immediately. If it is
zero on entry, block, waiting until some other coroutine has
called release() to make it larger than 0, and then return
True.
"""
print(self._capacity)
if self.locked():
self._waiting += 1
fut = self._loop.create_future()
self._waiters.append(fut)
sleep_time = self._get_sleep_time()
# Schedule the execution.
await asyncio.sleep(sleep_time)
try:
# Wait for the corresponding task that's already executing to
# finish.
await fut
except:
# See the similar code in Queue.get.
fut.cancel()
if self._capacity > 0 and not fut.cancelled():
self._wake_up_next()
raise
finally:
self._waiting -= 1
else:
self._last_token = None
self._capacity -= 1
self._time_table.append(datetime.datetime.now())
return True
def _wake_up_next(self):
while self._waiters:
waiter = self._waiters.popleft()
if not waiter.done():
waiter.set_result(None)
return
async def release(self):
"""Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to
become larger than zero again, wake up that coroutine.
"""
self._capacity += 1
self._wake_up_next()