使用 Python 的 asyncio 实现时间锁
Implementation of a time lock with Python's asyncio
我不知道这种锁是否称为时间锁,但我需要一些东西用于以下场景:我正在使用 aiohttp
发出大量并发请求,并且服务器可能在某个时候 returns 429 Too Many Requests
。在这种情况下,我必须暂停所有后续请求一段时间。
我想到了以下解决方案:
import asyncio
class TimeLock:
def __init__(self, *, loop=None):
self._locked = False
self._locked_at = None
self._time_lock = None
self._unlock_task = None
self._num_waiters = 0
if loop is not None:
self._loop = loop
else:
self._loop = asyncio.get_event_loop()
def __repr__(self):
state = f'locked at {self.locked_at}' if self._locked else 'unlocked'
return f'[{state}] {self._num_waiters} waiters'
@property
def locked(self):
return self._locked
@property
def locked_at(self):
return self._locked_at
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
# in this time lock there is nothing to do when it's released
return
async def acquire(self):
if not self._locked:
return True
try:
print('waiting for lock to be released')
self._num_waiters += 1
await self._time_lock
self._num_waiters -= 1
print('done, returning now')
except asyncio.CancelledError:
if self._locked:
raise
return True
def lock_for(self, delay, lock_more=False):
print(f'locking for {delay}')
if self._locked:
if not lock_more:
# if we don't want to increase the lock time, we just exit when
# the lock is already in a locked state
print('already locked, nothing to do')
return
print('already locked, but canceling old unlock task')
self._unlock_task.cancel()
self._locked = True
self._locked_at = time.time()
self._time_lock = self._loop.create_future()
self._unlock_task = self._loop.create_task(self.unlock_in(delay))
print('locked')
async def unlock_in(self, delay):
print('unlocking started')
await asyncio.sleep(delay)
self._locked = False
self._locked_at = None
self._unlock_task = None
self._time_lock.set_result(True)
print('unlocked')
我正在用这段代码测试锁:
import asyncio
from ares.http import TimeLock
async def run(lock, i):
async with lock:
print(lock)
print(i)
if i in (3, 6, 9):
lock.lock_for(2)
if __name__ == '__main__':
lock = TimeLock()
tasks = []
loop = asyncio.get_event_loop()
for i in range(10):
tasks.append(run(lock, i))
loop.run_until_complete(asyncio.gather(*tasks))
print(lock)
代码产生以下输出,这似乎与我从上述场景中想要的一致:
[unlocked] 0 waiters
0
[unlocked] 0 waiters
1
[unlocked] 0 waiters
2
[unlocked] 0 waiters
3
locking for 2
locked
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
unlocking started
unlocked
done, returning now
[unlocked] 5 waiters
4
done, returning now
[unlocked] 4 waiters
5
done, returning now
[unlocked] 3 waiters
6
locking for 2
locked
done, returning now
[locked at 1559496296.7109463] 2 waiters
7
done, returning now
[locked at 1559496296.7109463] 1 waiters
8
done, returning now
[locked at 1559496296.7109463] 0 waiters
9
locking for 2
already locked, nothing to do
unlocking started
[locked at 1559496296.7109463] 0 waiters
这是实现同步原语的正确方法吗?
我也不确定这段代码的线程安全性。我对线程和异步代码没有太多经验。
我没有测试你的代码,但idea似乎没问题。只有当你要在不同的线程中使用相同的锁对象时,你才应该担心 thread-safety 。正如 Jimmy Engelbrecht 已经指出的那样,asyncio 在单线程中运行,您通常不必担心原语的 thread-safety。
这里还有一些想法:
- 我不太了解术语,但似乎应该将此原语称为 semaphore
- 您可以继承或仅使用 existing primitive(s)
而不是从头开始实施
- 如果应该暂停而不是在客户端代码中执行,您可以委托信号量跟踪事件
这段代码片段展示了这个想法:
import asyncio
class PausingSemaphore:
def __init__(self, should_pause, pause_for_seconds):
self.should_pause = should_pause
self.pause_for_seconds = pause_for_seconds
self._is_paused = False
self._resume = asyncio.Event()
async def __aenter__(self):
await self.check_paused()
return self
async def __aexit__(self, exc_type, exc, tb):
if self.should_pause(exc):
self.pause()
async def check_paused(self):
if self._is_paused:
await self._resume.wait()
def pause(self):
if not self._is_paused:
self._is_paused = True
asyncio.get_running_loop().call_later(
self.pause_for_seconds,
self.unpause
)
def unpause(self):
self._is_paused = False
self._resume.set()
我们来测试一下:
import aiohttp
def should_pause(exc):
return (
type(exc) is aiohttp.ClientResponseError
and
exc.status == 429
)
pausing_sem = None
regular_sem = None
async def request(url):
async with regular_sem:
async with pausing_sem:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, raise_for_status=True) as resp:
print('Done!')
except aiohttp.ClientResponseError:
print('Too many requests!')
raise
async def main():
global pausing_sem
global regular_sem
pausing_sem = PausingSemaphore(should_pause, 5)
regular_sem = asyncio.Semaphore(3)
await asyncio.gather(
*[
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/status/429'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
],
return_exceptions=True
)
if __name__ == '__main__':
asyncio.run(main())
P.S。没怎么测试这段代码!
我不知道这种锁是否称为时间锁,但我需要一些东西用于以下场景:我正在使用 aiohttp
发出大量并发请求,并且服务器可能在某个时候 returns 429 Too Many Requests
。在这种情况下,我必须暂停所有后续请求一段时间。
我想到了以下解决方案:
import asyncio
class TimeLock:
def __init__(self, *, loop=None):
self._locked = False
self._locked_at = None
self._time_lock = None
self._unlock_task = None
self._num_waiters = 0
if loop is not None:
self._loop = loop
else:
self._loop = asyncio.get_event_loop()
def __repr__(self):
state = f'locked at {self.locked_at}' if self._locked else 'unlocked'
return f'[{state}] {self._num_waiters} waiters'
@property
def locked(self):
return self._locked
@property
def locked_at(self):
return self._locked_at
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
# in this time lock there is nothing to do when it's released
return
async def acquire(self):
if not self._locked:
return True
try:
print('waiting for lock to be released')
self._num_waiters += 1
await self._time_lock
self._num_waiters -= 1
print('done, returning now')
except asyncio.CancelledError:
if self._locked:
raise
return True
def lock_for(self, delay, lock_more=False):
print(f'locking for {delay}')
if self._locked:
if not lock_more:
# if we don't want to increase the lock time, we just exit when
# the lock is already in a locked state
print('already locked, nothing to do')
return
print('already locked, but canceling old unlock task')
self._unlock_task.cancel()
self._locked = True
self._locked_at = time.time()
self._time_lock = self._loop.create_future()
self._unlock_task = self._loop.create_task(self.unlock_in(delay))
print('locked')
async def unlock_in(self, delay):
print('unlocking started')
await asyncio.sleep(delay)
self._locked = False
self._locked_at = None
self._unlock_task = None
self._time_lock.set_result(True)
print('unlocked')
我正在用这段代码测试锁:
import asyncio
from ares.http import TimeLock
async def run(lock, i):
async with lock:
print(lock)
print(i)
if i in (3, 6, 9):
lock.lock_for(2)
if __name__ == '__main__':
lock = TimeLock()
tasks = []
loop = asyncio.get_event_loop()
for i in range(10):
tasks.append(run(lock, i))
loop.run_until_complete(asyncio.gather(*tasks))
print(lock)
代码产生以下输出,这似乎与我从上述场景中想要的一致:
[unlocked] 0 waiters
0
[unlocked] 0 waiters
1
[unlocked] 0 waiters
2
[unlocked] 0 waiters
3
locking for 2
locked
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
waiting for lock to be released
unlocking started
unlocked
done, returning now
[unlocked] 5 waiters
4
done, returning now
[unlocked] 4 waiters
5
done, returning now
[unlocked] 3 waiters
6
locking for 2
locked
done, returning now
[locked at 1559496296.7109463] 2 waiters
7
done, returning now
[locked at 1559496296.7109463] 1 waiters
8
done, returning now
[locked at 1559496296.7109463] 0 waiters
9
locking for 2
already locked, nothing to do
unlocking started
[locked at 1559496296.7109463] 0 waiters
这是实现同步原语的正确方法吗? 我也不确定这段代码的线程安全性。我对线程和异步代码没有太多经验。
我没有测试你的代码,但idea似乎没问题。只有当你要在不同的线程中使用相同的锁对象时,你才应该担心 thread-safety 。正如 Jimmy Engelbrecht 已经指出的那样,asyncio 在单线程中运行,您通常不必担心原语的 thread-safety。
这里还有一些想法:
- 我不太了解术语,但似乎应该将此原语称为 semaphore
- 您可以继承或仅使用 existing primitive(s) 而不是从头开始实施
- 如果应该暂停而不是在客户端代码中执行,您可以委托信号量跟踪事件
这段代码片段展示了这个想法:
import asyncio
class PausingSemaphore:
def __init__(self, should_pause, pause_for_seconds):
self.should_pause = should_pause
self.pause_for_seconds = pause_for_seconds
self._is_paused = False
self._resume = asyncio.Event()
async def __aenter__(self):
await self.check_paused()
return self
async def __aexit__(self, exc_type, exc, tb):
if self.should_pause(exc):
self.pause()
async def check_paused(self):
if self._is_paused:
await self._resume.wait()
def pause(self):
if not self._is_paused:
self._is_paused = True
asyncio.get_running_loop().call_later(
self.pause_for_seconds,
self.unpause
)
def unpause(self):
self._is_paused = False
self._resume.set()
我们来测试一下:
import aiohttp
def should_pause(exc):
return (
type(exc) is aiohttp.ClientResponseError
and
exc.status == 429
)
pausing_sem = None
regular_sem = None
async def request(url):
async with regular_sem:
async with pausing_sem:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, raise_for_status=True) as resp:
print('Done!')
except aiohttp.ClientResponseError:
print('Too many requests!')
raise
async def main():
global pausing_sem
global regular_sem
pausing_sem = PausingSemaphore(should_pause, 5)
regular_sem = asyncio.Semaphore(3)
await asyncio.gather(
*[
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/status/429'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
request('http://httpbin.org/get'),
],
return_exceptions=True
)
if __name__ == '__main__':
asyncio.run(main())
P.S。没怎么测试这段代码!