如何在不浪费太多 cpu 周期的情况下等到多线程队列不为空
How to wait until a multithread queue is not empty without wasting too much cpu cycles
我想让一个线程等待,直到多线程队列不为空。队列只有一个生产者和一个消费者。生产者在可用时将任务放入队列中,但生产者必须等到收集到两个或更多任务。我之所以不只使用 get
方法两次来检索两个任务,是因为它会使算法流程过于复杂。不过,这不能在下面的代码片段中描述,因为显然这只是一个过于简单化的示例。
我需要知道队列不为空,以便我可以将队列的峰值(不删除它)与我刚刚使用 get
删除的元素进行比较
睡眠如何完成:
while myQueue.empty():
sleep(0.05)
如何在不使用睡眠的情况下做到这一点?我应该使用 event.wait()
吗?如果是,我不知道应该如何正确使用 event.clear()
命令。由于我要等待的线程也是消费者,我无法确定队列是否为空。即使我用queue.empty()
来检查。
从本质上讲,您似乎需要实施 Queue.peek()
方法,这将 return 队列中的下一个元素而不实际删除它。
这个方法在标准的Queue对象中是没有的,但是你可以继承和扩展它没有问题:
from Queue import Queue
class VoyeurQueue(Queue):
def peek(self, block=True, timeout=None):
# ...
现在对于新的peek()
方法的内容,您可以简单地复制粘贴基础Queue
对象的get()
方法的内容并进行一些修改。如果你在 Linux,你可以在 /usr/lib/python?.?/Queue.py
找到它,如果你在 Windows,你可以在 %PYTHONPATH%/lib/Queue.py
找到它(不确定后者,因为我目前在 Linux 机器,无法检查)。在我的Python 2.7副本中,get()
方法实现为:
def get(self, block=True, timeout=None):
# ... lots of comments
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
def _get(self):
return self.queue.popleft()
现在,对于差异。您不想 删除 元素,因此我们定义以下内容而不是 _get()
:
def _peek(self):
return self.queue[0]
而在peek()
方法中,我们仍然使用self.not_empty
条件,但我们不再需要self.not_full.notify()
。因此生成的代码将如下所示:
from Queue import Queue
class VoyeurQueue(Queue):
def peek(self, block=True, timeout=None):
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._peek()
return item
finally:
self.not_empty.release()
def _peek(self):
return self.queue[0]
只是 myQueue.get(block=True)
将阻塞您的线程(停止其执行),直到有可从队列中检索的内容。当一个项目在队列中可用时,它将通过此调用返回。你可以添加一个超时,以防你想在队列永远不会被送入时退出。
见https://docs.python.org/3/library/queue.html#queue.Queue.get。
I want to make a thread wait until a multithread queue is not empty.
I want to avoid retrieving the next object, that's why I am not using the get method
如果您不介意使用哨兵对象(我使用一个我命名为 Done
的对象来告诉我的消费者线程我们已经完成,所以它可以结束。)
Start = object() # sentinel object on global scope.
制作人:
queue.put(Start)
在工人中:
item = queue.get() # blocks until something received
if item is Start:
print('we have now started!')
虽然我不确定你为什么要那样做,但这似乎确实符合你的要求。
您可以使用初始化为零的信号量与队列并行。比方说 mySemaphore = threading.Semaphore(0)
。默认情况下调用 mySempahore.acquire()
的线程将被阻塞,因为信号量为零而不触及队列。然后,当您将某些内容放入队列中时,您可以调用 mySemaphore.release()
这将允许一个线程执行(假设是下一个循环)。
我想让一个线程等待,直到多线程队列不为空。队列只有一个生产者和一个消费者。生产者在可用时将任务放入队列中,但生产者必须等到收集到两个或更多任务。我之所以不只使用 get
方法两次来检索两个任务,是因为它会使算法流程过于复杂。不过,这不能在下面的代码片段中描述,因为显然这只是一个过于简单化的示例。
我需要知道队列不为空,以便我可以将队列的峰值(不删除它)与我刚刚使用 get
睡眠如何完成:
while myQueue.empty():
sleep(0.05)
如何在不使用睡眠的情况下做到这一点?我应该使用 event.wait()
吗?如果是,我不知道应该如何正确使用 event.clear()
命令。由于我要等待的线程也是消费者,我无法确定队列是否为空。即使我用queue.empty()
来检查。
从本质上讲,您似乎需要实施 Queue.peek()
方法,这将 return 队列中的下一个元素而不实际删除它。
这个方法在标准的Queue对象中是没有的,但是你可以继承和扩展它没有问题:
from Queue import Queue
class VoyeurQueue(Queue):
def peek(self, block=True, timeout=None):
# ...
现在对于新的peek()
方法的内容,您可以简单地复制粘贴基础Queue
对象的get()
方法的内容并进行一些修改。如果你在 Linux,你可以在 /usr/lib/python?.?/Queue.py
找到它,如果你在 Windows,你可以在 %PYTHONPATH%/lib/Queue.py
找到它(不确定后者,因为我目前在 Linux 机器,无法检查)。在我的Python 2.7副本中,get()
方法实现为:
def get(self, block=True, timeout=None):
# ... lots of comments
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
def _get(self):
return self.queue.popleft()
现在,对于差异。您不想 删除 元素,因此我们定义以下内容而不是 _get()
:
def _peek(self):
return self.queue[0]
而在peek()
方法中,我们仍然使用self.not_empty
条件,但我们不再需要self.not_full.notify()
。因此生成的代码将如下所示:
from Queue import Queue
class VoyeurQueue(Queue):
def peek(self, block=True, timeout=None):
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._peek()
return item
finally:
self.not_empty.release()
def _peek(self):
return self.queue[0]
只是 myQueue.get(block=True)
将阻塞您的线程(停止其执行),直到有可从队列中检索的内容。当一个项目在队列中可用时,它将通过此调用返回。你可以添加一个超时,以防你想在队列永远不会被送入时退出。
见https://docs.python.org/3/library/queue.html#queue.Queue.get。
I want to make a thread wait until a multithread queue is not empty.
I want to avoid retrieving the next object, that's why I am not using the get method
如果您不介意使用哨兵对象(我使用一个我命名为 Done
的对象来告诉我的消费者线程我们已经完成,所以它可以结束。)
Start = object() # sentinel object on global scope.
制作人:
queue.put(Start)
在工人中:
item = queue.get() # blocks until something received
if item is Start:
print('we have now started!')
虽然我不确定你为什么要那样做,但这似乎确实符合你的要求。
您可以使用初始化为零的信号量与队列并行。比方说 mySemaphore = threading.Semaphore(0)
。默认情况下调用 mySempahore.acquire()
的线程将被阻塞,因为信号量为零而不触及队列。然后,当您将某些内容放入队列中时,您可以调用 mySemaphore.release()
这将允许一个线程执行(假设是下一个循环)。