如何在 asyncio.PriorityQueue 处于最大尺寸时将项目从 asyncio.PriorityQueue 中推出,并且我将新项目放入()?
How to push items off of asyncio.PriorityQueue when it is at maxsize and I put() new items?
我有一个 asyncio.PriorityQueue
用作网络爬虫的 URL 队列,当我调用时,得分最低的 URL 是第一个从队列中删除的url_queue.get()
。当队列达到 maxsize
个项目时,默认行为是阻止对 url_queue.put()
的调用,直到对 get()
的调用从队列中删除一个项目以进行 space。
我想做的是永远不阻塞,而是在我尝试 put()
得分较低的项目。在asyncio.PriorityQueue
中有没有一种方法可以通过这种方式自动从堆底部删除项目?如果没有,是否有与 asyncio 一起使用的替代优先级队列/堆实现,这将使我能够做到这一点?或者其他一些数据结构/技术可以让我拥有某种非阻塞的、具有最大大小的优先级队列?
谢谢!
Is there a way to automatically remove items from the bottom of the heap this way in asyncio.PriorityQueue
?
默认情况下不是,但从 asyncio.PriorityQueue
继承应该很简单,只需实现所需的行为。与多线程队列实现不同,asyncio队列在单线程中运行,因此不需要担心同步问题。
一个可能的性能问题是 PriorityQueue
没有设计成双端队列,所以它使用堆来存储项目。堆是 min 或 max,但不能同时是两者; Python 的 heapq
模块实现了最小堆,但您可以通过将优先级乘以 -1 来轻松模拟最大堆。在最小堆中,可以在对数时间内访问和弹出最小项,但不能访问最大项,而在最大堆中,情况恰恰相反。为了有效地操作最小和最大的项目,您需要继承 asyncio.Queue
并使用不同的数据结构来存储项目,例如 sorted list.
例如(未经测试):
class DroppingPriorityQueue(asyncio.Queue):
def _init(self, maxsize):
# called by asyncio.Queue.__init__
self._queue = sortedcontainers.SortedList()
def _put(self, item):
# called by asyncio.Queue.put_nowait
self._queue.add(item)
def _get(self):
# called by asyncio.Queue.get_nowait
# pop the first (most important) item off the queue
return self._queue.pop(0)
def __drop(self):
# drop the last (least important) item from the queue
self._queue.pop()
# no consumer will get a chance to process this item, so
# we must decrement the unfinished count ourselves
self.task_done()
def put_nowait(self, item):
if self.full():
self.__drop()
super().put_nowait(item)
async def put(self, item):
# Queue.put blocks when full, so we must override it.
# Since our put_nowait never raises QueueFull, we can just
# call it directly
self.put_nowait(item)
class 实现了两个不同的关注点:
- 它覆盖
_get
、_put
和 _init
受保护的方法以使用 SortedList
as the underlying storage. Although undocumented, these methods are used for building customized queues such as PriorityQueue
and LifoQueue
并且已经存在了几十年,首先在 Queue
模块(Python 3 中的 queue
)和后来的 asyncio.queue
.
- 它覆盖
put
和 put_nowait
public 方法来实现 drop-when-full 语义。
我有一个 asyncio.PriorityQueue
用作网络爬虫的 URL 队列,当我调用时,得分最低的 URL 是第一个从队列中删除的url_queue.get()
。当队列达到 maxsize
个项目时,默认行为是阻止对 url_queue.put()
的调用,直到对 get()
的调用从队列中删除一个项目以进行 space。
我想做的是永远不阻塞,而是在我尝试 put()
得分较低的项目。在asyncio.PriorityQueue
中有没有一种方法可以通过这种方式自动从堆底部删除项目?如果没有,是否有与 asyncio 一起使用的替代优先级队列/堆实现,这将使我能够做到这一点?或者其他一些数据结构/技术可以让我拥有某种非阻塞的、具有最大大小的优先级队列?
谢谢!
Is there a way to automatically remove items from the bottom of the heap this way in
asyncio.PriorityQueue
?
默认情况下不是,但从 asyncio.PriorityQueue
继承应该很简单,只需实现所需的行为。与多线程队列实现不同,asyncio队列在单线程中运行,因此不需要担心同步问题。
一个可能的性能问题是 PriorityQueue
没有设计成双端队列,所以它使用堆来存储项目。堆是 min 或 max,但不能同时是两者; Python 的 heapq
模块实现了最小堆,但您可以通过将优先级乘以 -1 来轻松模拟最大堆。在最小堆中,可以在对数时间内访问和弹出最小项,但不能访问最大项,而在最大堆中,情况恰恰相反。为了有效地操作最小和最大的项目,您需要继承 asyncio.Queue
并使用不同的数据结构来存储项目,例如 sorted list.
例如(未经测试):
class DroppingPriorityQueue(asyncio.Queue):
def _init(self, maxsize):
# called by asyncio.Queue.__init__
self._queue = sortedcontainers.SortedList()
def _put(self, item):
# called by asyncio.Queue.put_nowait
self._queue.add(item)
def _get(self):
# called by asyncio.Queue.get_nowait
# pop the first (most important) item off the queue
return self._queue.pop(0)
def __drop(self):
# drop the last (least important) item from the queue
self._queue.pop()
# no consumer will get a chance to process this item, so
# we must decrement the unfinished count ourselves
self.task_done()
def put_nowait(self, item):
if self.full():
self.__drop()
super().put_nowait(item)
async def put(self, item):
# Queue.put blocks when full, so we must override it.
# Since our put_nowait never raises QueueFull, we can just
# call it directly
self.put_nowait(item)
class 实现了两个不同的关注点:
- 它覆盖
_get
、_put
和_init
受保护的方法以使用SortedList
as the underlying storage. Although undocumented, these methods are used for building customized queues such asPriorityQueue
andLifoQueue
并且已经存在了几十年,首先在Queue
模块(Python 3 中的queue
)和后来的asyncio.queue
. - 它覆盖
put
和put_nowait
public 方法来实现 drop-when-full 语义。