python 的 asyncio 中的可加入 PriorityQueue
Joinable PriorityQueue in python's asyncio
根据 documentation,队列有多种实现。我感兴趣的是 JoinableQueue 和 PriorityQueue,因为我想要一个具有优先级的可连接队列。
看来我只能获得低版本的其中一项功能。 IE。在 3.5 中,我可以区分 Queue(可连接)和 PriorityQueue,但在 3.5 以下的 python 中,有 JoinableQueue 和 PriorityQueue(参见下面的示例)。
是否可以将它们组合起来以获得在 3.4 中获得可连接 PriorityQueue 的通用方法?
try:
# Python 3.4.
from asyncio import JoinableQueue as Queue # joinable
from asyncio import PriorityQueue # I assume this one is not joinable
except ImportError:
# Python 3.5.
from asyncio import Queue # standard joinable
from asyncio import PriorityQueue # I assume this is the one I want
另一种方法可能会以某种方式影响 Queue?
由于 JoinableQueue
和 PriorityQueue
的实现方式,只要您先列出 JoinableQueue
,就可以通过多重继承对两者进行子类化来获得 JoinablePriorityQueue
.
之所以可行,是因为 PriorityQueue
实现起来非常简单:
class PriorityQueue(Queue):
"""A subclass of Queue; retrieves entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
"""
def _init(self, maxsize):
self._queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self._queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self._queue)
虽然 JoinableQueue
更复杂,但它和 PriorityQueue
实现的唯一方法是 _put
,而且至关重要的是,JoinableQUeue
在其调用 super()._put(..)
拥有 put
实现,这意味着它将与 PriorityQueue
正确协作。
这是一个证明它有效的例子:
from asyncio import PriorityQueue, JoinableQueue
import asyncio
import random
class JoinablePriorityQueue(JoinableQueue, PriorityQueue):
pass
@asyncio.coroutine
def consume(q):
while True:
a = yield from q.get()
print("got a {}".format(a))
if a[1] is None:
q.task_done()
return
asyncio.sleep(1)
q.task_done()
@asyncio.coroutine
def produce(q):
for i in range(10):
yield from q.put((random.randint(0,10), i))
yield from q.put((100, None)) # Will be last
asyncio.async(consume(q))
print("waiting...")
yield from q.join()
print("waited")
loop = asyncio.get_event_loop()
q = JoinablePriorityQueue()
loop.run_until_complete(produce(q))
输出:
waiting...
got a (1, 2)
got a (2, 1)
got a (4, 4)
got a (5, 0)
got a (6, 8)
got a (6, 9)
got a (8, 3)
got a (9, 5)
got a (9, 7)
got a (10, 6)
got a (100, None)
waited
根据 documentation,队列有多种实现。我感兴趣的是 JoinableQueue 和 PriorityQueue,因为我想要一个具有优先级的可连接队列。
看来我只能获得低版本的其中一项功能。 IE。在 3.5 中,我可以区分 Queue(可连接)和 PriorityQueue,但在 3.5 以下的 python 中,有 JoinableQueue 和 PriorityQueue(参见下面的示例)。
是否可以将它们组合起来以获得在 3.4 中获得可连接 PriorityQueue 的通用方法?
try:
# Python 3.4.
from asyncio import JoinableQueue as Queue # joinable
from asyncio import PriorityQueue # I assume this one is not joinable
except ImportError:
# Python 3.5.
from asyncio import Queue # standard joinable
from asyncio import PriorityQueue # I assume this is the one I want
另一种方法可能会以某种方式影响 Queue?
由于 JoinableQueue
和 PriorityQueue
的实现方式,只要您先列出 JoinableQueue
,就可以通过多重继承对两者进行子类化来获得 JoinablePriorityQueue
.
之所以可行,是因为 PriorityQueue
实现起来非常简单:
class PriorityQueue(Queue):
"""A subclass of Queue; retrieves entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
"""
def _init(self, maxsize):
self._queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self._queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self._queue)
虽然 JoinableQueue
更复杂,但它和 PriorityQueue
实现的唯一方法是 _put
,而且至关重要的是,JoinableQUeue
在其调用 super()._put(..)
拥有 put
实现,这意味着它将与 PriorityQueue
正确协作。
这是一个证明它有效的例子:
from asyncio import PriorityQueue, JoinableQueue
import asyncio
import random
class JoinablePriorityQueue(JoinableQueue, PriorityQueue):
pass
@asyncio.coroutine
def consume(q):
while True:
a = yield from q.get()
print("got a {}".format(a))
if a[1] is None:
q.task_done()
return
asyncio.sleep(1)
q.task_done()
@asyncio.coroutine
def produce(q):
for i in range(10):
yield from q.put((random.randint(0,10), i))
yield from q.put((100, None)) # Will be last
asyncio.async(consume(q))
print("waiting...")
yield from q.join()
print("waited")
loop = asyncio.get_event_loop()
q = JoinablePriorityQueue()
loop.run_until_complete(produce(q))
输出:
waiting...
got a (1, 2)
got a (2, 1)
got a (4, 4)
got a (5, 0)
got a (6, 8)
got a (6, 9)
got a (8, 3)
got a (9, 5)
got a (9, 7)
got a (10, 6)
got a (100, None)
waited