在设置了queue.get()超时的情况下,仍然无法正常从队列中读取消息

with the condition that timeout for queue.get() has been set, still cannot read msg from queue normally

首先,我与 if not q.empty()

一起工作
from multiprocessing import Process, Queue, current_process
import datetime, time

def func1(q):
    for i in range(5):
        if not q.full():
            print('Put %s to queue...' % i)
            q.put(i)
        else:
            print(current_process().name, 'is Full', current_process())

def func2(q):
    while True:
        if not q.empty():
            i = q.get(block=True, timeout=1)
            print('Get %s from queue.' % i)
        else:
            print('queue empty')
            break

if __name__=='__main__':
    q = Queue(10)
    ProA = Process(target=func1, name='A', args=(q,))
    ProB = Process(target=func2, name='B', args=(q,))
    ProA.daemon = True
    ProB.daemon = True

    print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
    print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
    ProA.start()
    print('*' * 10, 'Since Now Start Subprocess A')
    print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
    print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())

    ProA.join()
    print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
    print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())

    ProB.start()
    print('*' * 10, 'Since Now Start Subprocess B')
    print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
    print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())

    ProB.join()
    print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
    print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())

结果显示如下:

2022-01-01 16:34:07.211678 A is Alive: False
2022-01-01 16:34:07.211738 B is Alive: False
********** Since Now Start Subprocess A
2022-01-01 16:34:07.213302 A is Alive: True
2022-01-01 16:34:07.213384 B is Alive: False
Put 0 to queue...
Put 1 to queue...
Put 2 to queue...
Put 3 to queue...
Put 4 to queue...
2022-01-01 16:34:07.244838 A is Alive: False
2022-01-01 16:34:07.244857 B is Alive: False
********** Since Now Start Subprocess B
2022-01-01 16:34:07.245714 A is Alive: False
2022-01-01 16:34:07.245756 B is Alive: True
Get 0 from queue.
Get 1 from queue.
Get 2 from queue.
Get 3 from queue.
Get 4 from queue.
queue empty
2022-01-01 16:34:07.272992 A is Alive: False
2022-01-01 16:34:07.273008 B is Alive: False

Process finished with exit code 0

但是,当我将条件更改为if not q.qsize == 0时, 将 func2 的部分替换为

def func2(q):
    while True:
        try:
            if q.qsize() != 0:
                i = q.get(block=True, timeout=1)
                print('Get %s from queue.' % i)
        except:
            print('q.qsize == 0')
            break

我得到了

2022-01-01 16:37:09.681813 A is Alive: False
2022-01-01 16:37:09.681980 B is Alive: False
********** Since Now Start Subprocess A
2022-01-01 16:37:09.684073 A is Alive: True
2022-01-01 16:37:09.684134 B is Alive: False
Put 0 to queue...
Put 1 to queue...
Put 2 to queue...
Put 3 to queue...
Put 4 to queue...
2022-01-01 16:37:09.720998 A is Alive: False
2022-01-01 16:37:09.721029 B is Alive: False
********** Since Now Start Subprocess B
2022-01-01 16:37:09.722099 A is Alive: False
2022-01-01 16:37:09.722140 B is Alive: True
q.qsize == 0
2022-01-01 16:37:09.761179 A is Alive: False
2022-01-01 16:37:09.761202 B is Alive: False

Process finished with exit code 0

我想我至少应该得到一些值作为输出...?不过好像代码只是跳到了判断条件'except',也就是说q.qsize这里等于0 ...???

这里有一些如何使用队列的例子。但首先,每个人都需要了解底层实现。 multiprocessing.Queue是使用multiprocessing.Pipe实现的,容量有限。也就是说,除非有其他进程正在读取数据,否则一个进程无法在没有最终阻塞的情况下以无限制的方式将数据发送到 Pipe 的底层连接。然而,您可以定义一个具有任意大容量甚至无限容量的队列,这样您就可以继续向队列发出 put 调用而不会阻塞。这是如何实现的?队列实例实际上启动了一个新线程,负责对管道进行底层写入,这个线程 阻塞,而调用 put 的线程可以继续运行。但这意味着在创建用于写入管道的线程终止之前进程不会终止,并且在管道能够接受数据之前它不会终止。这是什么意思呢?如果您有一个进程 A 是生产者进程 B 放置的队列中项目的消费者,则进程 A 必须首先从队列中获取所有项目,然后再尝试 join 进程 B(即等待进程 B 完成) 因为进程 B 可能无法完成,因为它可能在等待进程 A 从队列中获取项目时被阻塞。

示例 1

在此示例中,主进程启动了两个新进程,一个是项目的生产者 func1,另一个是这些项目的消费者 func2,并等待这两个进程完成。处理这种情况的最简单方法之一是生产者放置一个特殊的 sentinel 项目,该项目不会与常规项目混淆,表明没有更多项目需要处理。换句话说,哨兵充当文件结束指示器。因此,消费者只是简单地阻塞队列上的 get 调用,直到它检测到哨兵项,然后它可以终止。在这种情况下 None 被用作哨兵。

from multiprocessing import Process, Queue

def func1(q):
    for i in range(10):
        print('Put %s to queue...' % i)
        q.put(i)
    # Put special item signaling there are no more:
    q.put(None)

def func2(q):
    while True:
        i = q.get(block=True)
        if i is None:
            # There will be no more items on the queue
            break
        print('Get %s from queue.' % i)

if __name__=='__main__':
    q = Queue()
    proA = Process(target=func1, name='A', args=(q,))
    proB = Process(target=func2, name='B', args=(q,))
    proA.start()
    proB.start()
    proA.join()
    proB.join()

示例 2

在这个例子中,我们有两个队列:一个“工作队列”,work_q,用于放置要平方的数字和一个“结果队列”,results_q,其中从工作队列中获得的项目的平方结果被放置。和以前一样 func1 将一个哨兵放入工作队列以指示没有更多项目要处理。 func2 不需要将哨兵放入结果队列,因为 func1 知道队列中应该有多少结果。

from multiprocessing import Process, Queue

def func1(work_q, results_q):
    for i in range(10):
        work_q.put(i)
    # Put special item signaling there are no more:
    work_q.put(None)
    # There should be 10 results:
    for _ in range(10):
        print(results_q.get())

def func2(work_q, results_q):
    while True:
        i = work_q.get(block=True)
        if i is None:
            break
        results_q.put(i ** 2)

if __name__=='__main__':
    work_q = Queue()
    results_q = Queue()
    proA = Process(target=func1, name='A', args=(work_q, results_q))
    proB = Process(target=func2, name='B', args=(work_q, results_q))
    proA.start()
    proB.start()
    proA.join()
    proB.join()

示例 3

这个例子与前面的例子相似,只是 func1 不费心写一个哨兵,所以 func2 只是循环永远从工作队列中获取并放入结果队列。由于 func2 永不终止,因此以该函数为目标的进程必须是守护进程,以便在主进程终止时终止。

from multiprocessing import Process, Queue

def func1(work_q, results_q):
    for i in range(10):
        work_q.put(i)
    # Put special item signaling there are no more:
    # There should be 10 results:
    for _ in range(10):
        print(results_q.get())

def func2(work_q, results_q):
    while True:
        i = work_q.get(block=True)
        results_q.put(i ** 2)

if __name__=='__main__':
    work_q = Queue()
    results_q = Queue()
    proA = Process(target=func1, name='A', args=(work_q, results_q))
    proB = Process(target=func2, name='B', args=(work_q, results_q), daemon=True)
    proA.start()
    proB.start()
    proA.join()

示例 4

在前面的所有例子中,主进程启动了生产者和消费者进程,然后什么也不做,只是等待这些进程完成。看来我们可以少一个进程,即主进程本身可以是生产者。

from multiprocessing import Process, Queue


def func(work_q, results_q):
    while True:
        i = work_q.get(block=True)
        if i is None:
            break
        results_q.put(i ** 2)

if __name__=='__main__':
    work_q = Queue()
    results_q = Queue()

    for i in range(10):
        work_q.put(i)
    # Put special item signaling there are no more:
    work_q.put(None)

    proc = Process(target=func, name='A', args=(work_q, results_q))
    proc.start()

    # There should be 10 results:
    for _ in range(10):
        print(results_q.get())

    proc.join()

回到我的开场讨论。 请注意,这里的主要过程是在加入生产者之前检索生产者的10个结果。如果我要移动语句proc.join() 调用q.get()的循环之前,这将是一个严重的错误。

示例 5

最后,有一种间接方法可以在不获得显式计数的情况下对队列消息进行“计数”,即使用 multiprocessing.JoinableQueue,它支持另外两种方法 task_done()join().部分文档指出:

task_done():表示之前入队的任务已经完成。由队列消费者使用。对于每个用于获取任务的 get(),对 task_done() 的后续调用会告诉队列该任务的处理已完成。

join(): 阻塞直到队列中的所有项目都被获取和处理。

每当有项目添加到队列中时,未完成任务的计数就会增加。每当消费者调用 task_done() 以指示该项目已被检索并且所有工作已完成时,计数就会下降。当未完成任务的计数降为零时,join() 解除阻塞。

在下面的例子中,主进程将10,000个数字放入工作队列中。然而,这一次,消费者进程(也是结果的生产者)func2 将不确定数量的结果放入结果队列。所以主进程不能简单地使用计数器来检索不知道要计数多少的结果。 func2 也不能将 sentinel 项目放入结果队列,因为它处于无限循环中,因此无法知道将 sentinel 插入何处。

因此,我们使用JoinableQueue作为工作队列。然后主进程可以在工作队列上发出对 join 的调用,并且知道当该调用完成时 func2 已经处理了工作队列中的所有项目,即它已将所有结果放入结果队列。因此,主进程可以非阻塞进入循环,直到得到queue.Empty异常。

from multiprocessing import Process, Queue, JoinableQueue
import queue


def func(work_q, results_q):
    # This function "returns" an indeterminate number of results:
    while True:
        i = work_q.get(block=True)
        # Just return results for even values of i:
        if i % 2 == 0:
            results_q.put(i ** 2)
        # Show we have processed the item from the work_q:
        work_q.task_done()

if __name__=='__main__':
    work_q = JoinableQueue()
    results_q = Queue()

    proc = Process(target=func, name='A', args=(work_q, results_q), daemon=True)
    proc.start()

    for i in range(10_000):
        work_q.put(i)

    work_q.join()
    # All items have been read from the work_q and whatever results there are
    # have been put to the results_q:
    cnt = 0
    try:
        while True:
            print(results_q.get(block=False))
            cnt += 1
    except queue.Empty:
        pass
    print(cnt, 'items retrieved')

打印:

0
4
16
36
64
100
...
99760144
99800100
99840064
99880036
99920016
99960004
5000 items retrieved

示例 6

上面的例子,但这次只是使用“常规”队列,并为工作队列和结果队列使用哨兵。这也允许主进程在生成结果时对结果执行某些操作,而不必像前面的示例那样等待所有结果写入结果队列。

from multiprocessing import Process, Queue


def func(work_q, results_q):
    # This function "returns" an indeterminate number of results:
    while True:
        i = work_q.get(block=True)
        if i is None:
            break
        # Just return results for even values of i:
        if i % 2 == 0:
            results_q.put(i ** 2)
    results_q.put(None) # all results have been put

if __name__=='__main__':
    work_q = Queue()
    results_q = Queue()

    proc = Process(target=func, name='A', args=(work_q, results_q))
    proc.start()

    for i in range(10_000):
        work_q.put(i)
    work_q.put(None) # All items have been put

    cnt = 0
    while True:
        result = results_q.get(block=True)
        if result is None:
            break
        print(result)
        cnt += 1
    print(cnt, 'items retrieved')

    proc.join()