Python Asyncio 生产者-消费者工作流拥塞/队列增长
Python Asyncio producer-consumer workflow congestion / growing queue
我一直在编写 Python 应用程序,其中:
- 有一个
async
函数 producer
监听通过 websocket 传入的 items
,并将这些 items
放入 queue = asyncio.Queue()
.
- 有一个
async
函数 consumer
执行 queue.get()
,并通过不同的 websocket 连接查询 item_details
。
问题:将传入的items
放入queue
的平均速度远高于consumer
的平均速度queue
中的 get
项,结果 queue
一段时间后堆积起来。
问题:在没有多处理和[=39=的情况下,放大consumer
的正确方法是什么]不限制传入连接?我对 asyncio
和 threading
还不是很熟练。我想到了 运行 consumer
在单独的工人中,但据我了解 asyncio 的 run_in_executor
不能用在 async
函数上,还有这个东西 asyncio.Queue()
不是线程安全的。
如果消费者执行 IO 绑定工作,您可以只缩放其计数。而且你不关心多线程,因为 asyncio
基于非阻塞 IO 的思想,并且设计为在单线程中工作。如果没有本机异步替代方案,您可以甚至必须使用线程来处理阻塞 IO,例如对于文件 IO,但这是一个单独的故事。
这里有一个简单的例子来说明生产者创建任务的速度比单个消费者处理任务的速度快的情况。我用 asyncio.sleep
.
模拟 IO 工作负载
import asyncio
import itertools
async def producer(queue: asyncio.Queue):
"""producer emulator, creates ~ 10 tasks per second"""
sleep_seconds=0.1
counter = itertools.count(1)
while True:
await queue.put(next(counter))
await asyncio.sleep(sleep_seconds)
async def consumer(queue: asyncio.Queue, index):
"""slow io-bound consumer emulator, process ~ 5 tasks per second"""
sleep_seconds=0.2
while True:
task = await queue.get()
print(f"consumer={index}, task={task}, queue_size={queue.qsize()}")
await asyncio.sleep(sleep_seconds)
async def main():
q = asyncio.Queue()
concurrency = 2 # consumers count
tasks = [asyncio.create_task(consumer(q, i)) for i in range(concurrency)]
tasks += [asyncio.create_task(producer(q))]
await asyncio.wait(tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
单个消费者的输出,队列大小不断增长
consumer=0, task=1, queue_size=0
consumer=0, task=2, queue_size=0
consumer=0, task=3, queue_size=1
consumer=0, task=4, queue_size=2
consumer=0, task=5, queue_size=3
consumer=0, task=6, queue_size=4
consumer=0, task=7, queue_size=5
consumer=0, task=8, queue_size=6
consumer=0, task=9, queue_size=7
consumer=0, task=10, queue_size=8
两个消费者的输出,队列为空
consumer=0, task=1, queue_size=0
consumer=1, task=2, queue_size=0
consumer=0, task=3, queue_size=0
consumer=1, task=4, queue_size=0
consumer=0, task=5, queue_size=0
consumer=1, task=6, queue_size=0
consumer=0, task=7, queue_size=0
consumer=1, task=8, queue_size=0
consumer=0, task=9, queue_size=0
consumer=1, task=10, queue_size=0
我一直在编写 Python 应用程序,其中:
- 有一个
async
函数producer
监听通过 websocket 传入的items
,并将这些items
放入queue = asyncio.Queue()
. - 有一个
async
函数consumer
执行queue.get()
,并通过不同的 websocket 连接查询item_details
。
问题:将传入的items
放入queue
的平均速度远高于consumer
的平均速度queue
中的 get
项,结果 queue
一段时间后堆积起来。
问题:在没有多处理和[=39=的情况下,放大consumer
的正确方法是什么]不限制传入连接?我对 asyncio
和 threading
还不是很熟练。我想到了 运行 consumer
在单独的工人中,但据我了解 asyncio 的 run_in_executor
不能用在 async
函数上,还有这个东西 asyncio.Queue()
不是线程安全的。
如果消费者执行 IO 绑定工作,您可以只缩放其计数。而且你不关心多线程,因为 asyncio
基于非阻塞 IO 的思想,并且设计为在单线程中工作。如果没有本机异步替代方案,您可以甚至必须使用线程来处理阻塞 IO,例如对于文件 IO,但这是一个单独的故事。
这里有一个简单的例子来说明生产者创建任务的速度比单个消费者处理任务的速度快的情况。我用 asyncio.sleep
.
import asyncio
import itertools
async def producer(queue: asyncio.Queue):
"""producer emulator, creates ~ 10 tasks per second"""
sleep_seconds=0.1
counter = itertools.count(1)
while True:
await queue.put(next(counter))
await asyncio.sleep(sleep_seconds)
async def consumer(queue: asyncio.Queue, index):
"""slow io-bound consumer emulator, process ~ 5 tasks per second"""
sleep_seconds=0.2
while True:
task = await queue.get()
print(f"consumer={index}, task={task}, queue_size={queue.qsize()}")
await asyncio.sleep(sleep_seconds)
async def main():
q = asyncio.Queue()
concurrency = 2 # consumers count
tasks = [asyncio.create_task(consumer(q, i)) for i in range(concurrency)]
tasks += [asyncio.create_task(producer(q))]
await asyncio.wait(tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
单个消费者的输出,队列大小不断增长
consumer=0, task=1, queue_size=0
consumer=0, task=2, queue_size=0
consumer=0, task=3, queue_size=1
consumer=0, task=4, queue_size=2
consumer=0, task=5, queue_size=3
consumer=0, task=6, queue_size=4
consumer=0, task=7, queue_size=5
consumer=0, task=8, queue_size=6
consumer=0, task=9, queue_size=7
consumer=0, task=10, queue_size=8
两个消费者的输出,队列为空
consumer=0, task=1, queue_size=0
consumer=1, task=2, queue_size=0
consumer=0, task=3, queue_size=0
consumer=1, task=4, queue_size=0
consumer=0, task=5, queue_size=0
consumer=1, task=6, queue_size=0
consumer=0, task=7, queue_size=0
consumer=1, task=8, queue_size=0
consumer=0, task=9, queue_size=0
consumer=1, task=10, queue_size=0