Python asyncio - 使用 asyncio.Event() 阻塞消费者
Python asyncio - consumer blocking with asyncio.Event()
我有一个程序有一个生产者和两个 慢 消费者,我想用协程重写它,这样每个消费者将处理 只为它生成的最后一个值(即跳过处理旧值时生成的新值)(我使用线程和threading.Queue()
但它在put()
上阻塞,因为队列最满当时)。
阅读后我决定使用asyncio.Event
和asyncio.Queue
。我写了这个原型程序:
import asyncio
async def l(event, q):
h = 1
while True:
# ready
event.set()
# get value to process
a = await q.get()
# process it
print(a * h)
h *= 2
async def m(event, q):
i = 1
while True:
# pass element to consumer, when it's ready
if event.is_set():
await q.put(i)
event.clear()
# produce value
i += 1
el = asyncio.get_event_loop()
ev = asyncio.Event()
qu = asyncio.Queue(2)
tasks = [
asyncio.ensure_future(l(ev, qu)),
asyncio.ensure_future(m(ev, qu))
]
el.run_until_complete(asyncio.gather(*tasks))
el.close()
我注意到 l
协程在 q.get()
行阻塞并且不打印任何内容。
在两者中添加 asyncio.sleep()
后,它按预期工作(我得到 1,11,21,...):
import asyncio
import time
async def l(event, q):
h = 1
a = 1
event.set()
while True:
# await asyncio.sleep(1)
a = await q.get()
# process it
await asyncio.sleep(1)
print(a * h)
event.set()
async def m(event, q):
i = 1
while True:
# pass element to consumer, when it's ready
if event.is_set():
await q.put(i)
event.clear()
await asyncio.sleep(0.1)
# produce value
i += 1
el = asyncio.get_event_loop()
ev = asyncio.Event()
qu = asyncio.Queue(2)
tasks = [
asyncio.ensure_future(l(ev, qu)),
asyncio.ensure_future(m(ev, qu))
]
el.run_until_complete(asyncio.gather(*tasks))
el.close()
...但我正在寻找没有它的解决方案。
为什么会这样?我该如何解决?我想我不能从 m
调用 await l()
,因为它们都有状态(在原始程序中,第一个使用 PyGame 绘制解决方案,第二个绘制结果)。
代码没有像任务那样按预期工作运行 m 函数从未停止。如果 event.is_set() == False,任务将继续递增 i。因为这个任务永远不会被挂起,所以任务 运行 函数 l 永远不会被调用。因此,你需要一个方法来暂停任务运行的函数m。一种暂停方式是等待另一个协程,这就是 asyncio.sleep 按预期工作的原因。
我认为下面的代码可以按您的预期工作。 LeakyQueue 将确保消费者只处理来自生产者的最后一个值。由于复杂度非常对称,消费者将消费生产者生产的所有价值。如果增加延迟参数,可以模拟消费者只处理生产者最后创建的值。
import asyncio
class LeakyQueue(asyncio.Queue):
async def put(self, item):
if self.full():
await self.get()
await super().put(item)
async def consumer(queue, delay=0):
h = 1
while True:
a = await queue.get()
if delay:
await asyncio.sleep(delay)
print ('consumer', a)
h += 2
async def producer(queue):
i = 1
while True:
await asyncio.ensure_future(queue.put(i))
print ('producer', i)
i += 1
loop = asyncio.get_event_loop()
queue = LeakyQueue(maxsize=1)
tasks = [
asyncio.ensure_future(consumer(queue, 0)),
asyncio.ensure_future(producer(queue))
]
loop.run_until_complete(asyncio.gather(*tasks))
我有一个程序有一个生产者和两个 慢 消费者,我想用协程重写它,这样每个消费者将处理 只为它生成的最后一个值(即跳过处理旧值时生成的新值)(我使用线程和threading.Queue()
但它在put()
上阻塞,因为队列最满当时)。
阅读asyncio.Event
和asyncio.Queue
。我写了这个原型程序:
import asyncio
async def l(event, q):
h = 1
while True:
# ready
event.set()
# get value to process
a = await q.get()
# process it
print(a * h)
h *= 2
async def m(event, q):
i = 1
while True:
# pass element to consumer, when it's ready
if event.is_set():
await q.put(i)
event.clear()
# produce value
i += 1
el = asyncio.get_event_loop()
ev = asyncio.Event()
qu = asyncio.Queue(2)
tasks = [
asyncio.ensure_future(l(ev, qu)),
asyncio.ensure_future(m(ev, qu))
]
el.run_until_complete(asyncio.gather(*tasks))
el.close()
我注意到 l
协程在 q.get()
行阻塞并且不打印任何内容。
在两者中添加 asyncio.sleep()
后,它按预期工作(我得到 1,11,21,...):
import asyncio
import time
async def l(event, q):
h = 1
a = 1
event.set()
while True:
# await asyncio.sleep(1)
a = await q.get()
# process it
await asyncio.sleep(1)
print(a * h)
event.set()
async def m(event, q):
i = 1
while True:
# pass element to consumer, when it's ready
if event.is_set():
await q.put(i)
event.clear()
await asyncio.sleep(0.1)
# produce value
i += 1
el = asyncio.get_event_loop()
ev = asyncio.Event()
qu = asyncio.Queue(2)
tasks = [
asyncio.ensure_future(l(ev, qu)),
asyncio.ensure_future(m(ev, qu))
]
el.run_until_complete(asyncio.gather(*tasks))
el.close()
...但我正在寻找没有它的解决方案。
为什么会这样?我该如何解决?我想我不能从 m
调用 await l()
,因为它们都有状态(在原始程序中,第一个使用 PyGame 绘制解决方案,第二个绘制结果)。
代码没有像任务那样按预期工作运行 m 函数从未停止。如果 event.is_set() == False,任务将继续递增 i。因为这个任务永远不会被挂起,所以任务 运行 函数 l 永远不会被调用。因此,你需要一个方法来暂停任务运行的函数m。一种暂停方式是等待另一个协程,这就是 asyncio.sleep 按预期工作的原因。
我认为下面的代码可以按您的预期工作。 LeakyQueue 将确保消费者只处理来自生产者的最后一个值。由于复杂度非常对称,消费者将消费生产者生产的所有价值。如果增加延迟参数,可以模拟消费者只处理生产者最后创建的值。
import asyncio
class LeakyQueue(asyncio.Queue):
async def put(self, item):
if self.full():
await self.get()
await super().put(item)
async def consumer(queue, delay=0):
h = 1
while True:
a = await queue.get()
if delay:
await asyncio.sleep(delay)
print ('consumer', a)
h += 2
async def producer(queue):
i = 1
while True:
await asyncio.ensure_future(queue.put(i))
print ('producer', i)
i += 1
loop = asyncio.get_event_loop()
queue = LeakyQueue(maxsize=1)
tasks = [
asyncio.ensure_future(consumer(queue, 0)),
asyncio.ensure_future(producer(queue))
]
loop.run_until_complete(asyncio.gather(*tasks))