Python Multiprocessing.Queue.get - 超时和阻塞不起作用?
Python Multiprocessing.Queue.get - timeout and blocking not working?
我正在构建一个 运行 多个进程的应用程序。这些进程通过队列传递对象。一切正常。除了我需要进程在输入队列为空时停留一段时间之外。 queue.get() 方法是在一个无限循环中实现的,但我需要这个循环每隔一段时间 运行 即使他们在队列中没有输入能够 运行一些管理代码。
所以本质上它是一个无限循环,需要在 queue.get() 处阻塞至少一秒钟,并在有输入时做一些事情,否则做一些其他事情。
问题是:超时似乎不起作用:
while True:
# Process input queue, but make sure to continue the loop once every second if the queue is empty
wd_input = None
try:
wd_input = self.input_queue.get(block=True, timeout=1)
except Empty:
logging.debug(f"Watchdog queue empty. (interval: {interval.microseconds} microseconds)")
为清楚起见,我删除了一些代码。请相信我,我正在记录 Empty 异常之间的间隔。我得到这个输出:
[2022-05-04 09:35:27,648] DEBUG: Watchdog queue empty. (interval: 1412 microseconds)
[2022-05-04 09:35:28,650] DEBUG: Watchdog queue empty. (interval: 1306 microseconds)
[2022-05-04 09:35:29,651] DEBUG: Watchdog queue empty. (interval: 1417 microseconds)
[2022-05-04 09:35:30,652] DEBUG: Watchdog queue empty. (interval: 1329 microseconds)
[2022-05-04 09:35:31,654] DEBUG: Watchdog queue empty. (interval: 1323 microseconds)
[2022-05-04 09:35:32,655] DEBUG: Watchdog queue empty. (interval: 1318 microseconds)
[2022-05-04 09:35:33,656] DEBUG: Watchdog queue empty. (interval: 1324 microseconds)
[2022-05-04 09:35:34,658] DEBUG: Watchdog queue empty. (interval: 1322 microseconds)
[2022-05-04 09:35:35,659] DEBUG: Watchdog queue empty. (interval: 1308 microseconds)
这与我想要达到的 1 秒相去甚远。本质上,这也是文档所说的:
If timeout is a positive number, it blocks at most timeout seconds
and raises the queue.Empty exception if no item was available within
that time.
(https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue)
所以这也许是预料之中的。我想要的不是最多等待超时秒数,而是至少等待超时秒数。这可能吗?如何做到这一点?
我不知道这个interval.microseconds
是什么,你正在打印出来。但是您是否注意到实际调试消息上的时间戳?这些消息似乎在几毫秒内每秒输出一次。
import logging
import time
from queue import Queue, Empty
logging.basicConfig(format='%(asctime)s %(message)s')
q = Queue()
for _ in range(5):
try:
t0 = time.time()
msg = q.get(timeout=1)
except Empty:
t1 = time.time()
logging.warning(f'is when this queue empty event was logged. Elapsed = {t1 - t0}')
打印:
2022-05-04 07:24:05,768 is when this queue empty event was logged. Elapsed = 1.000537395477295
2022-05-04 07:24:06,769 is when this queue empty event was logged. Elapsed = 1.0004637241363525
2022-05-04 07:24:07,770 is when this queue empty event was logged. Elapsed = 1.000412940979004
2022-05-04 07:24:08,771 is when this queue empty event was logged. Elapsed = 1.000406265258789
2022-05-04 07:24:09,772 is when this queue empty event was logged. Elapsed = 1.000260591506958
我正在构建一个 运行 多个进程的应用程序。这些进程通过队列传递对象。一切正常。除了我需要进程在输入队列为空时停留一段时间之外。 queue.get() 方法是在一个无限循环中实现的,但我需要这个循环每隔一段时间 运行 即使他们在队列中没有输入能够 运行一些管理代码。
所以本质上它是一个无限循环,需要在 queue.get() 处阻塞至少一秒钟,并在有输入时做一些事情,否则做一些其他事情。
问题是:超时似乎不起作用:
while True:
# Process input queue, but make sure to continue the loop once every second if the queue is empty
wd_input = None
try:
wd_input = self.input_queue.get(block=True, timeout=1)
except Empty:
logging.debug(f"Watchdog queue empty. (interval: {interval.microseconds} microseconds)")
为清楚起见,我删除了一些代码。请相信我,我正在记录 Empty 异常之间的间隔。我得到这个输出:
[2022-05-04 09:35:27,648] DEBUG: Watchdog queue empty. (interval: 1412 microseconds)
[2022-05-04 09:35:28,650] DEBUG: Watchdog queue empty. (interval: 1306 microseconds)
[2022-05-04 09:35:29,651] DEBUG: Watchdog queue empty. (interval: 1417 microseconds)
[2022-05-04 09:35:30,652] DEBUG: Watchdog queue empty. (interval: 1329 microseconds)
[2022-05-04 09:35:31,654] DEBUG: Watchdog queue empty. (interval: 1323 microseconds)
[2022-05-04 09:35:32,655] DEBUG: Watchdog queue empty. (interval: 1318 microseconds)
[2022-05-04 09:35:33,656] DEBUG: Watchdog queue empty. (interval: 1324 microseconds)
[2022-05-04 09:35:34,658] DEBUG: Watchdog queue empty. (interval: 1322 microseconds)
[2022-05-04 09:35:35,659] DEBUG: Watchdog queue empty. (interval: 1308 microseconds)
这与我想要达到的 1 秒相去甚远。本质上,这也是文档所说的:
If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. (https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue)
所以这也许是预料之中的。我想要的不是最多等待超时秒数,而是至少等待超时秒数。这可能吗?如何做到这一点?
我不知道这个interval.microseconds
是什么,你正在打印出来。但是您是否注意到实际调试消息上的时间戳?这些消息似乎在几毫秒内每秒输出一次。
import logging
import time
from queue import Queue, Empty
logging.basicConfig(format='%(asctime)s %(message)s')
q = Queue()
for _ in range(5):
try:
t0 = time.time()
msg = q.get(timeout=1)
except Empty:
t1 = time.time()
logging.warning(f'is when this queue empty event was logged. Elapsed = {t1 - t0}')
打印:
2022-05-04 07:24:05,768 is when this queue empty event was logged. Elapsed = 1.000537395477295
2022-05-04 07:24:06,769 is when this queue empty event was logged. Elapsed = 1.0004637241363525
2022-05-04 07:24:07,770 is when this queue empty event was logged. Elapsed = 1.000412940979004
2022-05-04 07:24:08,771 is when this queue empty event was logged. Elapsed = 1.000406265258789
2022-05-04 07:24:09,772 is when this queue empty event was logged. Elapsed = 1.000260591506958