使用 asyncio 理解生产者-消费者程序

Understanding producer-consumer program with asyncio

我是 asyncio 编程的新手,正在尝试理解 producer/consumer behavior.I 有一个生产者和 3 个消费者,下面是我的代码片段

import asyncio

async def producer(q:asyncio.Queue) -> None:
    for i in range(3):
        print("Producer sleep for 1 seconds")
        await asyncio.sleep(1)
        print("Producer done sleeping")
        await q.put("Message" + str(i))
        print("Message added to queue")
    return

async def consumer(n: int, q:asyncio.Queue) -> None:
    while True:
        print(f"Consumer{n} going to wait for message")
        i = await q.get()
        await asyncio.sleep(3)
        print(f"Consumer{n} got the message {i}")
        q.task_done()
    return

async def main():
    q = asyncio.Queue()
    t1 = asyncio.create_task(producer(q))
    t2 = [asyncio.create_task(consumer(n, q)) for n in range(3)]
    await asyncio.gather(t1)
    await q.join()

if __name__ == "__main__":
    asyncio.run(main())

当我运行这个我得到低于输出

Producer sleep for 1 seconds
Consumer0 going to wait for message
Consumer1 going to wait for message
Consumer2 going to wait for message
Producer done sleeping
Message added to queue
Producer sleep for 1 seconds
Producer done sleeping  <<<<<<<<<<<<<<<<<<<<<<
Message added to queue
Producer sleep for 1 seconds
Producer done sleeping
Message added to queue
Consumer0 got the message Message0
Consumer0 going to wait for message
Consumer1 got the message Message1
Consumer1 going to wait for message
Consumer2 got the message Message2
Consumer2 going to wait for message

我预计其中一位消费者会醒来并阅读上面标记 (<<<) 的地方的消息,但这并没有发生。这是为什么?我以为当生产者进入睡眠状态时,它会放弃控制权,而等待 q.get() 的消费者会被唤醒(因为 q 有一条消息要处理),但这并没有发生。我在这里错过了什么?

实际上你没看错。

原因很简单,因为消费者的线路顺序错误:

async def consumer(n: int, q:asyncio.Queue) -> None:
    while True:
        print(f"Consumer{n} going to wait for message")
        i = await q.get()
        await asyncio.sleep(3)  # << here
        print(f"Consumer{n} got the message {i}")
        q.task_done()
    return

消费者在打印前睡了 3 秒。但是在生产者中,你先睡 1 秒,然后在接下来的 2 个循环中再等待 2 秒。

所以 3 秒休眠 消费者收到第一条消息后,足够长的时间让所有消费者消息在生产者任务完成后打印出来。

因此,如果我们更改睡眠和打印顺序,那么输出将是您所期望的。

Producer sleep for 1 seconds
Consumer0 going to wait for message
Consumer1 going to wait for message
Consumer2 going to wait for message
Producer done sleeping
Message added to queue
Producer sleep for 1 seconds
Consumer0 got the message Message0  <<<
Producer done sleeping
Message added to queue
Producer sleep for 1 seconds
Consumer1 got the message Message1
Producer done sleeping
Message added to queue
Consumer2 got the message Message2
Consumer0 going to wait for message
Consumer1 going to wait for message
Consumer2 going to wait for message

除此之外,您可以使用 await t1 代替 await asyncio.gather(t1)