"closable" 队列的数据类型,用于处理多个生产者和消费者的项目流

Data type for a "closable" queue to handle a stream of items for multiple producers and consumers

是否有一种特定类型的队列是 "closable",适用于有多个生产者、消费者且数据来自流(因此不知道何时结束)的情况?

我一直找不到实现这种行为的队列,也找不到它的名称,但它似乎是生产者-消费者类型问题的完整类型。

举个例子,理想情况下我可以编写这样的代码:(1) 每个生产者在完成时通知队列,(2) 消费者盲目地调用阻塞 get(),以及 (3) 当所有消费者都完成时完成后,队列为空,所有生产者将解除阻塞并收到 "done" 通知:

作为代码,它看起来像这样:

def produce():
  for x in range(randint()):
    queue.put(x)
    sleep(randint())
  queue.close()  # called once for every producer

def consume():
  while True:
    try:
      print queue.get()
    except ClosedQueue:
      print 'done!'
      break

num_producers = randint()
queue = QueueTypeThatICantFigureOutANameFor(num_producers)
[Thread(target=produce).start() for _ in range(num_producers)]
[Thread(target=consume).start() for _ in range(random())

此外,我 不是 寻找 "Poison Pill" 解决方案,其中 "done" 值被添加到每个消费者的队列中 -- 我不喜欢生产者需要知道有多少消费者的不优雅。

我称之为自锁队列。

根据您的主要要求,将队列与条件变量检查相结合,以便在所有生产者腾出时优雅地锁定(关闭)队列:

class SelfLatchingQueue(LatchingQueue):
  ...
  def __init__(self, num_producers):
    ...

  def close(self):
    '''Called by a producer to indicate that it is done producing'''

    ... perhaps check that current thread is a known producer? ...

    with self.a_mutex:
      self._num_active_producers -= 1
      if self._num_active_producers <= 0:
        # Future put()s throw QueueLatched. get()s will empty the queue
        # and then throw QueueEmpty thereafter
        self.latch() # Guess what superclass implements this?

对于您的次要要求(原始 post 中的#3,完成的生产者显然会阻塞,直到所有消费者都完成),我可能会使用 barrier 或另一个条件变量。当然,这可以在 SelfLatchingQueue 的子类中实现,但是在不知道代码库的情况下,我会将此行为与自动锁定分开。