多个资源的同步阻塞
Synchronous blocking of multiple resources
抽象情况。我们有 2 只羊,我们可以在时间异步使用 (Semaphore(2)) 和 1 个门,我们可以在时间使用。我们要花 2 次羊穿过门(每次我们需要 1 只羊和 1 个门,持续 1 秒)并喂羊 1 次(需要 1 只羊和 2 秒)。这是代码示例:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await asyncio.gather(
sheep.acquire(),
gate.acquire()
) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await asyncio.gather(
sheep.acquire()
) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
acquire gate (spend 1)
acquire sheep (spend 1)
acquire sheep (spend 2) <-----
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
acquire sheep (feed 1)
acquire gate (spend 2)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
Feed sheep
release sheep (feed 1)
[Finished in 3.2s]
问题是程序没有以最佳方式运行,原因在输出的第 3 行:spend 2
挡住了羊,但它不能立即使用它,它应该等待被挡住的单门spend 1
。 spend 1
可以喂养的第二只可用绵羊只是浪费了时间:
程序应该如何运行的最佳方式:spend 1
挡住 1 只羊和 1 个门,spend 2
看到门被挡住了,没有理由立即挡住第二只羊。 feed 1
可以挡二羊和运行,而spend 1
是运行宁。在这种情况下,程序将在 2 秒内完成,而不是 3 秒:
如果在 main 的 gather 中更改顺序,很容易看出。
资源不仅要并行阻塞,同步也要阻塞,只有sheep可用gate可用才阻塞sheep和gate。类似的东西:
while sheep.locked() or gate.locked():
asyncio.sleep(0)
await asyncio.gather(
sheep.acquire(),
gate.acquire()
)
但它看起来不像是通用且好的解决方案。可能存在解决此问题的任何模式或更好的方法吗?欢迎任何想法。
您可以实现一个 asynchronous context manager 来处理多个锁。此对象应确保在等待另一个不可用的锁时不持有任何锁:
class multilock(asyncio.locks._ContextManagerMixin):
def __init__(self, *locks):
self.released = list(locks)
self.acquired = []
async def acquire(self):
while self.released:
lock = self.released.pop()
if lock.locked():
self.release()
await lock.acquire()
self.acquired.append(lock)
def release(self):
while self.acquired:
lock = self.acquired.pop()
lock.release()
self.released.append(lock)
示例:
async def test(lock1, lock2):
async with multilock(lock1, lock2):
print('Do something')
基于我为这个例子创建了解决方案。我们需要两件事:
将locked()
函数添加到Sheep
和Gate
,这是检查是否
现在可以获取对象
添加并使用新的MultiAcquire
任务来获取对象只有如果现在可以获取所有对象(否则暂停释放事件) )
这是最终代码,参见 MultiAcquire
- 主要代码:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class MultiAcquire(asyncio.Task):
_check_lock = asyncio.Lock() # to suspend for creating task that acquires objects
_release_event = asyncio.Event() # to suspend for any object was released
def __init__(self, locks):
super().__init__(self._task_coro())
self._locks = locks
# Here we use decorator to subscribe all release() calls,
# _release_event would be set in this case:
for l in self._locks:
l.release = self._notify(l.release)
async def _task_coro(self):
while True:
# Create task to acquire all locks and break on success:
async with type(self)._check_lock:
if not any(l.locked() for l in self._locks): # task would be created only if all objects can be acquired
task = asyncio.gather(*[l.acquire() for l in self._locks]) # create task to acquire all objects
await asyncio.sleep(0) # start task without waiting for it
break
# Wait for any release() to try again:
await type(self)._release_event.wait()
# Wait for task:
return await task
def _notify(self, func):
def wrapper(*args, **kwargs):
type(self)._release_event.set()
type(self)._release_event.clear()
return func(*args, **kwargs)
return wrapper
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await MultiAcquire([sheep, gate]) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await MultiAcquire([sheep]) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]
抽象情况。我们有 2 只羊,我们可以在时间异步使用 (Semaphore(2)) 和 1 个门,我们可以在时间使用。我们要花 2 次羊穿过门(每次我们需要 1 只羊和 1 个门,持续 1 秒)并喂羊 1 次(需要 1 只羊和 2 秒)。这是代码示例:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await asyncio.gather(
sheep.acquire(),
gate.acquire()
) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await asyncio.gather(
sheep.acquire()
) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
acquire gate (spend 1)
acquire sheep (spend 1)
acquire sheep (spend 2) <-----
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
acquire sheep (feed 1)
acquire gate (spend 2)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
Feed sheep
release sheep (feed 1)
[Finished in 3.2s]
问题是程序没有以最佳方式运行,原因在输出的第 3 行:spend 2
挡住了羊,但它不能立即使用它,它应该等待被挡住的单门spend 1
。 spend 1
可以喂养的第二只可用绵羊只是浪费了时间:
程序应该如何运行的最佳方式:spend 1
挡住 1 只羊和 1 个门,spend 2
看到门被挡住了,没有理由立即挡住第二只羊。 feed 1
可以挡二羊和运行,而spend 1
是运行宁。在这种情况下,程序将在 2 秒内完成,而不是 3 秒:
如果在 main 的 gather 中更改顺序,很容易看出。
资源不仅要并行阻塞,同步也要阻塞,只有sheep可用gate可用才阻塞sheep和gate。类似的东西:
while sheep.locked() or gate.locked():
asyncio.sleep(0)
await asyncio.gather(
sheep.acquire(),
gate.acquire()
)
但它看起来不像是通用且好的解决方案。可能存在解决此问题的任何模式或更好的方法吗?欢迎任何想法。
您可以实现一个 asynchronous context manager 来处理多个锁。此对象应确保在等待另一个不可用的锁时不持有任何锁:
class multilock(asyncio.locks._ContextManagerMixin):
def __init__(self, *locks):
self.released = list(locks)
self.acquired = []
async def acquire(self):
while self.released:
lock = self.released.pop()
if lock.locked():
self.release()
await lock.acquire()
self.acquired.append(lock)
def release(self):
while self.acquired:
lock = self.acquired.pop()
lock.release()
self.released.append(lock)
示例:
async def test(lock1, lock2):
async with multilock(lock1, lock2):
print('Do something')
基于
将
locked()
函数添加到Sheep
和Gate
,这是检查是否 现在可以获取对象添加并使用新的
MultiAcquire
任务来获取对象只有如果现在可以获取所有对象(否则暂停释放事件) )
这是最终代码,参见 MultiAcquire
- 主要代码:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class MultiAcquire(asyncio.Task):
_check_lock = asyncio.Lock() # to suspend for creating task that acquires objects
_release_event = asyncio.Event() # to suspend for any object was released
def __init__(self, locks):
super().__init__(self._task_coro())
self._locks = locks
# Here we use decorator to subscribe all release() calls,
# _release_event would be set in this case:
for l in self._locks:
l.release = self._notify(l.release)
async def _task_coro(self):
while True:
# Create task to acquire all locks and break on success:
async with type(self)._check_lock:
if not any(l.locked() for l in self._locks): # task would be created only if all objects can be acquired
task = asyncio.gather(*[l.acquire() for l in self._locks]) # create task to acquire all objects
await asyncio.sleep(0) # start task without waiting for it
break
# Wait for any release() to try again:
await type(self)._release_event.wait()
# Wait for task:
return await task
def _notify(self, func):
def wrapper(*args, **kwargs):
type(self)._release_event.set()
type(self)._release_event.clear()
return func(*args, **kwargs)
return wrapper
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await MultiAcquire([sheep, gate]) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await MultiAcquire([sheep]) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]