如何在 asyncio.PriorityQueue 处于最大尺寸时将项目从 asyncio.PriorityQueue 中推出,并且我将新项目放入()?

How to push items off of asyncio.PriorityQueue when it is at maxsize and I put() new items?

我有一个 asyncio.PriorityQueue 用作网络爬虫的 URL 队列,当我调用时,得分最低的 URL 是第一个从队列中删除的url_queue.get()。当队列达到 maxsize 个项目时,默认行为是阻止对 url_queue.put() 的调用,直到对 get() 的调用从队列中删除一个项目以进行 space。

我想做的是永远不阻塞,而是在我尝试 put()得分较低的项目。在asyncio.PriorityQueue中有没有一种方法可以通过这种方式自动从堆底部删除项目?如果没有,是否有与 asyncio 一起使用的替代优先级队列/堆实现,这将使我能够做到这一点?或者其他一些数据结构/技术可以让我拥有某种非阻塞的、具有最大大小的优先级队列?

谢谢!

Is there a way to automatically remove items from the bottom of the heap this way in asyncio.PriorityQueue?

默认情况下不是,但从 asyncio.PriorityQueue 继承应该很简单,只需实现所需的行为。与多线程队列实现不同,asyncio队列在单线程中运行,因此不需要担心同步问题。

一个可能的性能问题是 PriorityQueue 没有设计成双端队列,所以它使用堆来存储项目。堆是 minmax,但不能同时是两者; Python 的 heapq 模块实现了最小堆,但您可以通过将优先级乘以 -1 来轻松模拟最大堆。在最小堆中,可以在对数时间内访问和弹出最小项,但不能访问最大项,而在最大堆中,情况恰恰相反。为了有效地操作最小和最大的项目,您需要继承 asyncio.Queue 并使用不同的数据结构来存储项目,例如 sorted list.

例如(未经测试):

class DroppingPriorityQueue(asyncio.Queue):
    def _init(self, maxsize):
        # called by asyncio.Queue.__init__
        self._queue = sortedcontainers.SortedList()

    def _put(self, item):
        # called by asyncio.Queue.put_nowait
        self._queue.add(item)

    def _get(self):
        # called by asyncio.Queue.get_nowait
        # pop the first (most important) item off the queue
        return self._queue.pop(0)

    def __drop(self):
        # drop the last (least important) item from the queue
        self._queue.pop()
        # no consumer will get a chance to process this item, so
        # we must decrement the unfinished count ourselves
        self.task_done()

    def put_nowait(self, item):
        if self.full():
            self.__drop()
        super().put_nowait(item)

    async def put(self, item):
        # Queue.put blocks when full, so we must override it.
        # Since our put_nowait never raises QueueFull, we can just
        # call it directly
        self.put_nowait(item)

class 实现了两个不同的关注点:

  • 它覆盖 _get_put_init 受保护的方法以使用 SortedList as the underlying storage. Although undocumented, these methods are used for building customized queues such as PriorityQueue and LifoQueue 并且已经存在了几十年,首先在 Queue 模块(Python 3 中的 queue)和后来的 asyncio.queue.
  • 它覆盖 putput_nowait public 方法来实现 drop-when-full 语义。