多个资源的同步阻塞

Synchronous blocking of multiple resources

抽象情况。我们有 2 只羊,我们可以在时间异步使用 (Semaphore(2)) 和 1 个门,我们可以在时间使用。我们要花 2 次羊穿过门(每次我们需要 1 只羊和 1 个门,持续 1 秒)并喂羊 1 次(需要 1 只羊和 2 秒)。这是代码示例:

import asyncio


class Sheep:
    _sem = asyncio.Semaphore(2)  # we have 2 avaliable sheeps at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire sheep ({})'.format(self._reason))

    def release(self):
        print('release sheep ({})'.format(self._reason))
        type(self)._sem.release()


class Gate:
    _sem = asyncio.Semaphore(1)  # we have 1 avaliable gate at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire gate ({})'.format(self._reason))

    def release(self):
        print('release gate ({})'.format(self._reason))
        type(self)._sem.release()


async def spend(reason):
    sheep = Sheep(reason)
    gate = Gate(reason)
    await asyncio.gather(
        sheep.acquire(), 
        gate.acquire()
    )  # block 1 sheep, 1 gate
    await asyncio.sleep(1)  # 1 second
    print('Spend sheep through a gate')
    sheep.release()
    gate.release()


async def feed(reason):
    sheep = Sheep(reason)
    await  asyncio.gather(
        sheep.acquire()
    )  # block 1 sheep
    await asyncio.sleep(2)  # 2 seconds
    print('Feed sheep')
    sheep.release()


async def main():
    await asyncio.gather(
        spend('spend 1'),
        feed('feed 1'),
        spend('spend 2')
    )  # spend 2 times, feed 1 time


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

acquire gate (spend 1)
acquire sheep (spend 1)
acquire sheep (spend 2) <-----
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
acquire sheep (feed 1)
acquire gate (spend 2)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
Feed sheep
release sheep (feed 1)
[Finished in 3.2s]

问题是程序没有以最佳方式运行,原因在输出的第 3 行:spend 2 挡住了羊,但它不能立即使用它,它应该等待被挡住的单门spend 1spend 1 可以喂养的第二只可用绵羊只是浪费了时间:

程序应该如何运行的最佳方式:spend 1 挡住 1 只羊和 1 个门,spend 2 看到门被挡住了,没有理由立即挡住第二只羊。 feed 1可以挡二羊和运行,而spend 1是运行宁。在这种情况下,程序将在 2 秒内完成,而不是 3 秒:

如果在 main 的 gather 中更改顺序,很容易看出。

资源不仅要并行阻塞,同步也要阻塞,只有sheep可用gate可用才阻塞sheep和gate。类似的东西:

while sheep.locked() or gate.locked():
    asyncio.sleep(0)
await asyncio.gather(
    sheep.acquire(), 
    gate.acquire()
)

但它看起来不像是通用且好的解决方案。可能存在解决此问题的任何模式或更好的方法吗?欢迎任何想法。

您可以实现一个 asynchronous context manager 来处理多个锁。此对象应确保在等待另一个不可用的锁时不持有任何锁:

class multilock(asyncio.locks._ContextManagerMixin):

    def __init__(self, *locks):
        self.released = list(locks)
        self.acquired = []

    async def acquire(self):
        while self.released:
            lock = self.released.pop()
            if lock.locked():
                self.release()
            await lock.acquire()
            self.acquired.append(lock)

    def release(self):
        while self.acquired:
            lock = self.acquired.pop()
            lock.release()
            self.released.append(lock)

示例:

async def test(lock1, lock2):
    async with multilock(lock1, lock2):
        print('Do something')

基于我为这个例子创建了解决方案。我们需要两件事:

  1. locked()函数添加到SheepGate,这是检查是否 现在可以获取对象

  2. 添加并使用新的MultiAcquire任务来获取对象只有如果现在可以获取所有对象(否则暂停释放事件) )

这是最终代码,参见 MultiAcquire - 主要代码:

import asyncio


class Sheep:
    _sem = asyncio.Semaphore(2)  # we have 2 avaliable sheeps at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire sheep ({})'.format(self._reason))

    def release(self):
        print('release sheep ({})'.format(self._reason))
        type(self)._sem.release()

    def locked(self):
        return type(self)._sem.locked()


class Gate:
    _sem = asyncio.Semaphore(1)  # we have 1 avaliable gate at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire gate ({})'.format(self._reason))

    def release(self):
        print('release gate ({})'.format(self._reason))
        type(self)._sem.release()

    def locked(self):
        return type(self)._sem.locked()


class MultiAcquire(asyncio.Task):
    _check_lock = asyncio.Lock()  # to suspend for creating task that acquires objects
    _release_event = asyncio.Event()  # to suspend for any object was released

    def __init__(self, locks):
        super().__init__(self._task_coro())
        self._locks = locks
        # Here we use decorator to subscribe all release() calls,
        # _release_event would be set in this case:
        for l in self._locks:
            l.release = self._notify(l.release)

    async def _task_coro(self):
        while True:
            # Create task to acquire all locks and break on success:
            async with type(self)._check_lock:
                if not any(l.locked() for l in self._locks):  # task would be created only if all objects can be acquired
                    task = asyncio.gather(*[l.acquire() for l in self._locks])  # create task to acquire all objects 
                    await asyncio.sleep(0)  # start task without waiting for it
                    break
            # Wait for any release() to try again:
            await type(self)._release_event.wait()
        # Wait for task:
        return await task

    def _notify(self, func):
        def wrapper(*args, **kwargs):
            type(self)._release_event.set()
            type(self)._release_event.clear()
            return func(*args, **kwargs)
        return wrapper


async def spend(reason):
    sheep = Sheep(reason)
    gate = Gate(reason)
    await MultiAcquire([sheep, gate])  # block 1 sheep, 1 gate
    await asyncio.sleep(1)  # 1 second
    print('Spend sheep through a gate')
    sheep.release()
    gate.release()


async def feed(reason):
    sheep = Sheep(reason)
    await MultiAcquire([sheep])  # block 1 sheep
    await asyncio.sleep(2)  # 2 seconds
    print('Feed sheep')
    sheep.release()


async def main():
    await asyncio.gather(
        spend('spend 1'),
        feed('feed 1'),
        spend('spend 2')
    )  # spend 2 times, feed 1 time


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]