如何正确排队?为什么睡眠消费者使队列工作?

How to properly make a queue? Why does sleep on consumer make the queue work?

我正在尝试实现一个队列。这是旧代码,要么取自我前段时间做的某种教程,要么取自我阅读文档时所做的某种实验,或者两者的混合。事情是我不确定代码是否是我的,但我试图用它作为一个例子来学习。该脚本有一个生产者在列表中产生数字,2 个消费者竞争获取这些数字并将它们相加,总和最高的获胜。

所以,这是我的问题:在“consume_numbers”函数的以下代码中,我有一个 time.sleep(0.01) 行,它使代码成为 运行。没有它,代码挂起,有了它 运行 就很顺利了。有人可以解释为什么会发生这种情况以及我如何在没有这个问题的情况下实现队列吗?

import concurrent.futures
import time
import random
import threading
import queue


class MyQueue(queue.Queue):
    def __init__(self, maxsize=10):
        super().__init__()
        self.maxsize = maxsize
        self.numbers = []

    def set_number(self, number):
        self.put(number)
        self.numbers.append(number)

    def get_number(self):
        return self.get()


def produce_random_numbers(q: MyQueue, maxcount: int, evnt: threading.Event):
    count = 0
    while not evnt.is_set():
        num = random.randint(1, 5)
        q.set_number(num)
        count += 1
        if count > maxcount:
            event.set()


def consume_numbers(q: MyQueue, consumed: list, evnt: threading.Event):
    while not q.empty() or not evnt.is_set():
        num = q.get_number()
        time.sleep(0.01)
        consumed.append(num)


if __name__ == "__main__":
    q = MyQueue(maxsize=10)
    event = threading.Event()
    cons1 = []
    cons2 = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
        ex.submit(produce_random_numbers, q, 50, event)
        ex.submit(consume_numbers, q, cons1, event)
        ex.submit(consume_numbers, q, cons2, event)
    event.set()
    print(f'Generated Numbers: {q.numbers}')
    print(f'Numbers Consumed by Thread1 which summed up to {sum(cons1)} are: {cons1}')
    print(f'Numbers Consumed by Thread2 which summed up to {sum(cons2)} are: {cons2}')
    if sum(cons1) > sum(cons2):
        print("Thread1 Wins!")
    elif sum(cons1) < sum(cons2):
        print("Thread2 Wins!")
    else:
        print("It's a tie!")

谢谢!

该代码并未从头实现队列,而是扩展queue.Queue以添加内存。有一个事件对象用于向消费者发出生产者线程已完成的信号。当队列中只有一项时,消费者中存在隐藏的竞争条件。

检查 not q.empty() or not evnt.is_set() 将 运行 循环代码,如果队列中有内容或事件尚未设置。可能会发生:

  1. 一个线程看到队列不为空,进入循环
  2. 发生线程切换,另一个线程消费了最后一项
  3. 第一个线程发生切换,调用 get_number() 并阻塞

evnt.is_set() 检查时会发生类似的竞争情况:

  1. 生产者将最后一项添加到队列中,并发生线程切换
  2. 一个线程消费最后一项,一个开关
  3. 发生线程切换,消费者获取最后一项并返回循环条件。由于事件尚未设置,循环被执行并且 get_number() 阻塞

让线程等待可以最大限度地减少这些情况发生的可能性。如果没有等待,很可能单个消费者线程将队列中的所有项目都消费完,而另一个消费者线程仍在进入其循环。

使用超时很麻烦。避免使用事件的一个有用的习惯用法是使用 iter 并使用不可能的值作为标记:

# --- snip ---
def produce_random_numbers(q: MyQueue, maxcount: int, n_consumers: int):
    for _ in range(maxcount):
        num = random.randint(1, 5)
        q.set_number(num)
    for _ in range(n_consumers):
        q.put(None)  # <--- I use put to put one sentinel per consumer


def consume_numbers(q: MyQueue, consumed: list):
    for num in iter(q.get_number, None):
        consumed.append(num)


if __name__ == "__main__":
    q = MyQueue(maxsize=10)
    cons1 = []
    cons2 = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
        ex.submit(produce_random_numbers, q, 500000, 2)
        ex.submit(consume_numbers, q, cons1)
        ex.submit(consume_numbers, q, cons2)
    print(f'Generated Numbers: {q.numbers}')
# --- snip ---

还有一些其他问题和事情我会做不同的事情:

  • with...块后的event.set()没用:事件已经被生产者设置
  • 生产者中有一个拼写错误,使用了全局 event 变量而不是局部 evnt 参数。幸运的是,它们指的是同一个对象。
  • 因为只有一个生产者,所以不会有问题。否则 MyQueue.numbers 的顺序可能与项目添加到队列的顺序不同:
    1. put 在一个线程上被调用
    2. 线程切换发生
    3. a put + append 发生在新线程中
    4. 发生线程切换,第一个值为appended
  • 而不是定义 MyQueue.set_number 我会覆盖 put