"closable" 队列的数据类型,用于处理多个生产者和消费者的项目流
Data type for a "closable" queue to handle a stream of items for multiple producers and consumers
是否有一种特定类型的队列是 "closable",适用于有多个生产者、消费者且数据来自流(因此不知道何时结束)的情况?
我一直找不到实现这种行为的队列,也找不到它的名称,但它似乎是生产者-消费者类型问题的完整类型。
举个例子,理想情况下我可以编写这样的代码:(1) 每个生产者在完成时通知队列,(2) 消费者盲目地调用阻塞 get(),以及 (3) 当所有消费者都完成时完成后,队列为空,所有生产者将解除阻塞并收到 "done" 通知:
作为代码,它看起来像这样:
def produce():
for x in range(randint()):
queue.put(x)
sleep(randint())
queue.close() # called once for every producer
def consume():
while True:
try:
print queue.get()
except ClosedQueue:
print 'done!'
break
num_producers = randint()
queue = QueueTypeThatICantFigureOutANameFor(num_producers)
[Thread(target=produce).start() for _ in range(num_producers)]
[Thread(target=consume).start() for _ in range(random())
此外,我 不是 寻找 "Poison Pill" 解决方案,其中 "done" 值被添加到每个消费者的队列中 -- 我不喜欢生产者需要知道有多少消费者的不优雅。
我称之为自锁队列。
根据您的主要要求,将队列与条件变量检查相结合,以便在所有生产者腾出时优雅地锁定(关闭)队列:
class SelfLatchingQueue(LatchingQueue):
...
def __init__(self, num_producers):
...
def close(self):
'''Called by a producer to indicate that it is done producing'''
... perhaps check that current thread is a known producer? ...
with self.a_mutex:
self._num_active_producers -= 1
if self._num_active_producers <= 0:
# Future put()s throw QueueLatched. get()s will empty the queue
# and then throw QueueEmpty thereafter
self.latch() # Guess what superclass implements this?
对于您的次要要求(原始 post 中的#3,完成的生产者显然会阻塞,直到所有消费者都完成),我可能会使用 barrier 或另一个条件变量。当然,这可以在 SelfLatchingQueue 的子类中实现,但是在不知道代码库的情况下,我会将此行为与自动锁定分开。
是否有一种特定类型的队列是 "closable",适用于有多个生产者、消费者且数据来自流(因此不知道何时结束)的情况?
我一直找不到实现这种行为的队列,也找不到它的名称,但它似乎是生产者-消费者类型问题的完整类型。
举个例子,理想情况下我可以编写这样的代码:(1) 每个生产者在完成时通知队列,(2) 消费者盲目地调用阻塞 get(),以及 (3) 当所有消费者都完成时完成后,队列为空,所有生产者将解除阻塞并收到 "done" 通知:
作为代码,它看起来像这样:
def produce():
for x in range(randint()):
queue.put(x)
sleep(randint())
queue.close() # called once for every producer
def consume():
while True:
try:
print queue.get()
except ClosedQueue:
print 'done!'
break
num_producers = randint()
queue = QueueTypeThatICantFigureOutANameFor(num_producers)
[Thread(target=produce).start() for _ in range(num_producers)]
[Thread(target=consume).start() for _ in range(random())
此外,我 不是 寻找 "Poison Pill" 解决方案,其中 "done" 值被添加到每个消费者的队列中 -- 我不喜欢生产者需要知道有多少消费者的不优雅。
我称之为自锁队列。
根据您的主要要求,将队列与条件变量检查相结合,以便在所有生产者腾出时优雅地锁定(关闭)队列:
class SelfLatchingQueue(LatchingQueue):
...
def __init__(self, num_producers):
...
def close(self):
'''Called by a producer to indicate that it is done producing'''
... perhaps check that current thread is a known producer? ...
with self.a_mutex:
self._num_active_producers -= 1
if self._num_active_producers <= 0:
# Future put()s throw QueueLatched. get()s will empty the queue
# and then throw QueueEmpty thereafter
self.latch() # Guess what superclass implements this?
对于您的次要要求(原始 post 中的#3,完成的生产者显然会阻塞,直到所有消费者都完成),我可能会使用 barrier 或另一个条件变量。当然,这可以在 SelfLatchingQueue 的子类中实现,但是在不知道代码库的情况下,我会将此行为与自动锁定分开。