Python asyncio - 使用 asyncio.Event() 阻塞消费者

Python asyncio - consumer blocking with asyncio.Event()

我有一个程序有一个生产者和两个 消费者,我想用协程重写它,这样每个消费者将处理 只为它生成的最后一个值(即跳过处理旧值时生成的新值)(我使用线程和threading.Queue()但它在put()上阻塞,因为队列最满当时)。

阅读后我决定使用asyncio.Eventasyncio.Queue。我写了这个原型程序:

import asyncio

async def l(event, q):
    h = 1
    while True:
        # ready
        event.set()
        # get value to process
        a = await q.get()
        # process it
        print(a * h)
        h *= 2

async def m(event, q):
    i = 1
    while True:
        # pass element to consumer, when it's ready
        if event.is_set():
            await q.put(i)
            event.clear()
        # produce value
        i += 1

el = asyncio.get_event_loop()
ev = asyncio.Event()
qu = asyncio.Queue(2)
tasks = [
            asyncio.ensure_future(l(ev, qu)),
            asyncio.ensure_future(m(ev, qu))
        ]
el.run_until_complete(asyncio.gather(*tasks))
el.close()

我注意到 l 协程在 q.get() 行阻塞并且不打印任何内容。

在两者中添加 asyncio.sleep() 后,它按预期工作(我得到 1,11,21,...):

import asyncio
import time

async def l(event, q):
    h = 1
    a = 1
    event.set()
    while True:
        # await asyncio.sleep(1)
        a = await q.get()
        # process it
        await asyncio.sleep(1)
        print(a * h)
        event.set()

async def m(event, q):
    i = 1
    while True:
        # pass element to consumer, when it's ready
        if event.is_set():
            await q.put(i)
            event.clear()
        await asyncio.sleep(0.1)
        # produce value
        i += 1

el = asyncio.get_event_loop()
ev = asyncio.Event()
qu = asyncio.Queue(2)
tasks = [
            asyncio.ensure_future(l(ev, qu)),
            asyncio.ensure_future(m(ev, qu))
        ]
el.run_until_complete(asyncio.gather(*tasks))
el.close()

...但我正在寻找没有它的解决方案。

为什么会这样?我该如何解决?我想我不能从 m 调用 await l(),因为它们都有状态(在原始程序中,第一个使用 PyGame 绘制解决方案,第二个绘制结果)。

代码没有像任务那样按预期工作运行 m 函数从未停止。如果 event.is_set() == False,任务将继续递增 i。因为这个任务永远不会被挂起,所以任务 运行 函数 l 永远不会被调用。因此,你需要一个方法来暂停任务运行的函数m。一种暂停方式是等待另一个协程,这就是 asyncio.sleep 按预期工作的原因。

我认为下面的代码可以按您的预期工作。 LeakyQueue 将确保消费者只处理来自生产者的最后一个值。由于复杂度非常对称,消费者将消费生产者生产的所有价值。如果增加延迟参数,可以模拟消费者只处理生产者最后创建的值。

import asyncio

class LeakyQueue(asyncio.Queue):
    async def put(self, item):
        if self.full():
            await self.get()
        await super().put(item)

async def consumer(queue, delay=0):
    h = 1
    while True:
        a = await queue.get()
        if delay:
            await asyncio.sleep(delay)
        print ('consumer', a)
        h += 2

async def producer(queue):
    i = 1
    while True:
        await asyncio.ensure_future(queue.put(i))
        print ('producer', i)
        i += 1

loop = asyncio.get_event_loop()
queue = LeakyQueue(maxsize=1)
tasks = [
    asyncio.ensure_future(consumer(queue, 0)),
    asyncio.ensure_future(producer(queue))
]
loop.run_until_complete(asyncio.gather(*tasks))