python 是否提供同步缓冲区?
Does python provide a synchronized buffer?
我对Pythonqueue.Queue非常熟悉。当您想要在消费者和生产者线程之间建立可靠的流时,这绝对是您想要的。
然而,有时您的生产者比消费者更快并且被迫丢弃数据(例如,对于实时视频帧捕获。我们通常可能只想缓冲最后一个或两个帧)。
Python是否提供类似于queue.Queue
的异步缓冲区class?
如何使用 queue.Queue
.
正确实现一个并不十分明显
我可以,例如:
buf = queue.Queue(maxsize=3)
def produce(msg):
if buf.full():
buf.get(block=False) # Make space
buf.put(msg, block=False)
def consume():
msg = buf.get(block=True)
work(msg)
尽管我不是特别喜欢 produce 不是锁定的队列原子操作。例如,消费可能在 full 和 get 之间开始,对于多生产者场景,它(可能)会被破坏。
是否有开箱即用的解决方案?
队列已经是多处理和多线程安全的,因为您不能同时写入和读取队列。但是,您是正确的,没有什么可以阻止在 full()
和 get
命令之间修改队列。
因此您可以使用锁,这就是您控制多行之间线程访问的方式。锁只能获得一次,所以如果它当前被锁定,所有其他线程将等到它被释放后再继续。
import threading
lock = threading.Lock()
def produce(msg):
lock.acquire()
if buf.full():
buf.get(block=False) # Make space
buf.put(msg, block=False)
lock.release()
def consume():
msg = None
while !msg:
lock.acquire()
try:
msg = buf.get(block=False)
except queue.Empty:
# buffer is empty, wait and try again
sleep(0.01)
lock.release()
work(msg)
没有为此内置任何内容,但它看起来足够简单,可以构建您自己的缓冲区 class 来包装 Queue
并在 .put()
和 [=14= 之间提供互斥] 有自己的锁,并使用 Condition
变量在添加项目时唤醒潜在的消费者。像这样:
import threading
class SBuf:
def __init__(self, maxsize):
import queue
self.q = queue.Queue()
self.maxsize = maxsize
self.nonempty = threading.Condition()
def get(self):
with self.nonempty:
while not self.q.qsize():
self.nonempty.wait()
assert self.q.qsize()
return self.q.get()
def put(self, v):
with self.nonempty:
while self.q.qsize() >= self.maxsize:
self.q.get()
self.q.put(v)
assert 0 < self.q.qsize() <= self.maxsize
self.nonempty.notify_all()
顺便说一句,我建议不要尝试使用原始锁来构建这种逻辑。当然可以做到,但是 Condition
变量是经过精心设计的,可以避免出现意想不到的竞争条件。 Condition
变量有一个学习曲线,但非常值得攀登:它们通常使事情变得简单而不是耗费脑力。事实上,Python 的 threading
模块在内部使用它们来实现各种事情。
另一种选择
在上面,我们只在我们自己的锁的保护下调用 queue.Queue
方法,所以真的没有必要使用线程安全的容器——我们已经提供了所有的线程安全。
所以使用更简单的容器会更精简一些。令人高兴的是,collections.deque
可以配置为丢弃除最近的 N
条目之外的所有条目,但“以 C 速度”。像这样:
class SBuf:
def __init__(self, maxsize):
import collections
self.q = collections.deque(maxlen=maxsize)
self.maxsize = maxsize
self.nonempty = threading.Condition()
def get(self):
with self.nonempty:
while not self.q:
self.nonempty.wait()
assert self.q
return self.q.popleft()
def put(self, v):
with self.nonempty:
self.q.append(v) # discards oldest, if needed
assert 0 < len(self.q) <= self.maxsize
self.nonempty.notify()
这也将 .notify_all()
更改为 .notify()
。在这个用例中,两者都可以正常工作,但我们只添加了一项,因此 不需要 通知多个消费者。如果有多个消费者在等待,.notify_all()
会唤醒所有消费者,但只有第一个会找到一个非空队列。其他人会看到它是空的,然后再次 .wait()
。
我对Pythonqueue.Queue非常熟悉。当您想要在消费者和生产者线程之间建立可靠的流时,这绝对是您想要的。 然而,有时您的生产者比消费者更快并且被迫丢弃数据(例如,对于实时视频帧捕获。我们通常可能只想缓冲最后一个或两个帧)。
Python是否提供类似于queue.Queue
的异步缓冲区class?
如何使用 queue.Queue
.
我可以,例如:
buf = queue.Queue(maxsize=3)
def produce(msg):
if buf.full():
buf.get(block=False) # Make space
buf.put(msg, block=False)
def consume():
msg = buf.get(block=True)
work(msg)
尽管我不是特别喜欢 produce 不是锁定的队列原子操作。例如,消费可能在 full 和 get 之间开始,对于多生产者场景,它(可能)会被破坏。
是否有开箱即用的解决方案?
队列已经是多处理和多线程安全的,因为您不能同时写入和读取队列。但是,您是正确的,没有什么可以阻止在 full()
和 get
命令之间修改队列。
因此您可以使用锁,这就是您控制多行之间线程访问的方式。锁只能获得一次,所以如果它当前被锁定,所有其他线程将等到它被释放后再继续。
import threading
lock = threading.Lock()
def produce(msg):
lock.acquire()
if buf.full():
buf.get(block=False) # Make space
buf.put(msg, block=False)
lock.release()
def consume():
msg = None
while !msg:
lock.acquire()
try:
msg = buf.get(block=False)
except queue.Empty:
# buffer is empty, wait and try again
sleep(0.01)
lock.release()
work(msg)
没有为此内置任何内容,但它看起来足够简单,可以构建您自己的缓冲区 class 来包装 Queue
并在 .put()
和 [=14= 之间提供互斥] 有自己的锁,并使用 Condition
变量在添加项目时唤醒潜在的消费者。像这样:
import threading
class SBuf:
def __init__(self, maxsize):
import queue
self.q = queue.Queue()
self.maxsize = maxsize
self.nonempty = threading.Condition()
def get(self):
with self.nonempty:
while not self.q.qsize():
self.nonempty.wait()
assert self.q.qsize()
return self.q.get()
def put(self, v):
with self.nonempty:
while self.q.qsize() >= self.maxsize:
self.q.get()
self.q.put(v)
assert 0 < self.q.qsize() <= self.maxsize
self.nonempty.notify_all()
顺便说一句,我建议不要尝试使用原始锁来构建这种逻辑。当然可以做到,但是 Condition
变量是经过精心设计的,可以避免出现意想不到的竞争条件。 Condition
变量有一个学习曲线,但非常值得攀登:它们通常使事情变得简单而不是耗费脑力。事实上,Python 的 threading
模块在内部使用它们来实现各种事情。
另一种选择
在上面,我们只在我们自己的锁的保护下调用 queue.Queue
方法,所以真的没有必要使用线程安全的容器——我们已经提供了所有的线程安全。
所以使用更简单的容器会更精简一些。令人高兴的是,collections.deque
可以配置为丢弃除最近的 N
条目之外的所有条目,但“以 C 速度”。像这样:
class SBuf:
def __init__(self, maxsize):
import collections
self.q = collections.deque(maxlen=maxsize)
self.maxsize = maxsize
self.nonempty = threading.Condition()
def get(self):
with self.nonempty:
while not self.q:
self.nonempty.wait()
assert self.q
return self.q.popleft()
def put(self, v):
with self.nonempty:
self.q.append(v) # discards oldest, if needed
assert 0 < len(self.q) <= self.maxsize
self.nonempty.notify()
这也将 .notify_all()
更改为 .notify()
。在这个用例中,两者都可以正常工作,但我们只添加了一项,因此 不需要 通知多个消费者。如果有多个消费者在等待,.notify_all()
会唤醒所有消费者,但只有第一个会找到一个非空队列。其他人会看到它是空的,然后再次 .wait()
。