Python 异步中是否存在任何故意不公平(后进先出)的同步结构?

Are there any deliberately unfair (LIFO) synchronisation constructs within Python async?

我正在寻找一种对递归算法进行速率限制的方法。为了演示这个问题,我将使用探索目录树的示例:

import asyncio
from pathlib import Path

async def explore_tree(path: Path) -> ExploreResult:
    tasks = []
    for child in path.iterdir():
        if child.is_dir():
            tasks.add(explore_tree(child))
        elif child.is_file():
            tasks.add(parse_file(child))
    results = await asyncio.gather(*tasks)
    return combine_results(results)

这段代码的问题是活动任务数量呈指数级增长的方式。其中许多任务占用 OS 资源,因此即使任务本身在理论上很便宜,但同时 运行 数百万也会导致问题。然而,我们不想一次 运行 一项任务(没有 gather),因为 运行 并行执行这些任务可以显着提高性能。

Python的async Semaphore是先进先出Python的异步锁是先进先出的,它的信号量除了竞争条件外大部分都是先进先出的。这导致探索大致是广度优先的,即使大多数任务都在等待,任务数量也会激增至数百万。如前所述,这仍然会导致问题,并没有解决根本原因。


其中的一个示例错误是 IOError: [Errno 24] Too many open files。显然,可以通过信号量限制打开文件。但是这会将问题转移到其他资源上,我们最终会在个别资源限制下玩“怪胎”。


我正在寻找的 是一种类似于信号量的东西,它故意不公平并且容易饿死。我想要一个后进先出而不是先进先出的信号量。目的是利用资源限制将探索压缩为更深度优先的模式,而不是广度优先。

令我惊讶的是 python 的信号量不公平。存在一种竞争条件,它优先于对 acquire() 的新调用,而不是已释放但尚未在事件循环中执行的任务。即:

sem.realease()
await sem.acquire()

无论等待任务的数量如何,上述代码都不会阻塞,甚至会 re-order 等待任务队列。很遗憾,这个对象对于执行严格的命令是无用的。

我自己写的:

class FairSemaphore:
    """
    Semaphore with strictly controlled order.

    By default this will be first-in-first-out but can be configured to be last-in-first-out
    """

    _queue: Deque[asyncio.Future]
    _value: int
    _fifo: bool

    def __init__(self, value: int, fifo=True):
        """
        Initial value of the semaphore
        :param value: Initial value for the semaphore
        :param fifo: If True (default) the first task to call acquire() will be the first to be released.
            If False the last task to call acquire() at the moment release() is called will be the first to be released.
        """
        self._value = value
        self._queue = collections.deque()
        self._fifo = fifo

    def locked(self) -> bool:
        """
        Indicates if acquire() can be called without blocking.
        """
        return not self._value

    async def acquire(self):
        if self._value:
            self._value -= 1
        else:
            loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
            future = loop.create_future()
            self._queue.append(future)

            try:
                await future
            except:
                # This condition happens when the future's result was set but the task was cancelled
                # In other words another task completed and released this one... but this one got cancelled before it
                # could do anything.  As a result we need to release another.
                if not future.cancelled():
                    self.release()
                # else:
                # But if we were NOT released then we do not have the right to release another.
                raise

    def release(self):
        # Tasks can get cancelled while in the queue.
        # Naively you would expect their _acquire() code to remove them from the queue.  But that doesn't always work
        # because the event loop might not have given them chance execute the CancelledError except clause yet.
        # It's absolutely unavoidable that there could be cancelled tasks waiting on this queue.
        # When that happen the done() state of the future goes to True...
        while self._queue:
            future = self._queue.popleft() if self._fifo else self._queue.pop()
            if not future.done():
                future.set_result(None)
                break
            # ... we discard any task which is already "done" because
        else:
            self._value += 1

    async def __aenter__(self):
        await self.acquire()

    async def __aexit__(self, exc_type, exc, tb):
        self.release()