python 是否提供同步缓冲区?

Does python provide a synchronized buffer?

我对Pythonqueue.Queue非常熟悉。当您想要在消费者和生产者线程之间建立可靠的流时,这绝对是您想要的。 然而,有时您的生产者比消费者更快并且被迫丢弃数据(例如,对于实时视频帧捕获。我们通常可能只想缓冲最后一个或两个帧)。

Python是否提供类似于queue.Queue的异步缓冲区class? 如何使用 queue.Queue.

正确实现一个并不十分明显

我可以,例如:

buf = queue.Queue(maxsize=3)
def produce(msg):
   if buf.full():
      buf.get(block=False)  # Make space
   buf.put(msg, block=False)

def consume():
   msg = buf.get(block=True)
   work(msg)

尽管我不是特别喜欢 produce 不是锁定的队列原子操作。例如,消费可能在 full 和 get 之间开始,对于多生产者场景,它(可能)会被破坏。

是否有开箱即用的解决方案?

队列已经是多处理和多线程安全的,因为您不能同时写入和读取队列。但是,您是正确的,没有什么可以阻止在 full()get 命令之间修改队列。

因此您可以使用锁,这就是您控制多行之间线程访问的方式。锁只能获得一次,所以如果它当前被锁定,所有其他线程将等到它被释放后再继续。

import threading

lock = threading.Lock()

def produce(msg):
   lock.acquire()
   if buf.full():
      buf.get(block=False)  # Make space
   buf.put(msg, block=False)
   lock.release()

def consume():
   msg = None

   while !msg:
       lock.acquire()
       try:
           msg = buf.get(block=False)
       except queue.Empty:
           # buffer is empty, wait and try again
           sleep(0.01)
       lock.release()

   work(msg)

没有为此内置任何内容,但它看起来足够简单,可以构建您自己的缓冲区 class 来包装 Queue 并在 .put() 和 [=14= 之间提供互斥] 有自己的锁,并使用 Condition 变量在添加项目时唤醒潜在的消费者。像这样:

import threading

class SBuf:
    def __init__(self, maxsize):
        import queue
        self.q = queue.Queue()
        self.maxsize = maxsize
        self.nonempty = threading.Condition()

    def get(self):
        with self.nonempty:
            while not self.q.qsize():
                self.nonempty.wait()
            assert self.q.qsize()
            return self.q.get()

    def put(self, v):
        with self.nonempty:
            while self.q.qsize() >= self.maxsize:
                 self.q.get()
            self.q.put(v)
            assert 0 < self.q.qsize() <= self.maxsize
            self.nonempty.notify_all()

顺便说一句,我建议不要尝试使用原始锁来构建这种逻辑。当然可以做到,但是 Condition 变量是经过精心设计的,可以避免出现意想不到的竞争条件。 Condition 变量有一个学习曲线,但非常值得攀登:它们通常使事情变得简单而不是耗费脑力。事实上,Python 的 threading 模块在内部使用它们来实现各种事情。

另一种选择

在上面,我们只在我们自己的锁的保护下调用 queue.Queue 方法,所以真的没有必要使用线程安全的容器——我们已经提供了所有的线程安全。

所以使用更简单的容器会更精简一些。令人高兴的是,collections.deque 可以配置为丢弃除最近的 N 条目之外的所有条目,但“以 C 速度”。像这样:

class SBuf:
    def __init__(self, maxsize):
        import collections
        self.q = collections.deque(maxlen=maxsize)
        self.maxsize = maxsize
        self.nonempty = threading.Condition()

    def get(self):
        with self.nonempty:
            while not self.q:
                self.nonempty.wait()
            assert self.q
            return self.q.popleft()

    def put(self, v):
        with self.nonempty:
            self.q.append(v) # discards oldest, if needed
            assert 0 < len(self.q) <= self.maxsize
            self.nonempty.notify()

这也将 .notify_all() 更改为 .notify()。在这个用例中,两者都可以正常工作,但我们只添加了一项,因此 不需要 通知多个消费者。如果有多个消费者在等待,.notify_all() 会唤醒所有消费者,但只有第一个会找到一个非空队列。其他人会看到它是空的,然后再次 .wait()