如何在 python 中实现异步工作队列?

how to implement asyncio worker queue in python?

我需要为 aiohttp 创建工作队列。

我现在正在使用 asyncio.gather,但它的工作方式不对:

这就是我想要做的:

第一个可以用下面的代码实现:

async def some_stuff(_):
    pass

tasks = []
for i in data:
    tasks.append(do_stuff(i))

asyncio.run(asyncio.gather(*tasks))

我需要

的例子

据我了解,您希望 运行 正好并行执行 5 个任务。当其中一项任务完成时,您想立即开始一项新任务。为此,asyncio.gather 不起作用,因为它会等待所有任务完成后再继续。

我的建议如下:

from collections import deque
import random
import asyncio

class RunSome:
    def __init__(self, task_count=5):
        self.task_count = task_count
        self.running = set()
        self.waiting = deque()
        
    @property
    def running_task_count(self):
        return len(self.running)
        
    def add_task(self, coro):
        if len(self.running) >= self.task_count:
            self.waiting.append(coro)
        else:
            self._start_task(coro)
        
    def _start_task(self, coro):
        self.running.add(coro)
        asyncio.create_task(self._task(coro))
        
    async def _task(self, coro):
        try:
            return await coro
        finally:
            self.running.remove(coro)
            if self.waiting:
                coro2 = self.waiting.popleft()
                self._start_task(coro2)
            
async def main():
    runner = RunSome()
    async def rand_delay():
        rnd = random.random() + 0.5
        print("Task started", asyncio.current_task().get_name(),
              runner.running_task_count)
        await asyncio.sleep(rnd)
        print("Task ended", asyncio.current_task().get_name(),
              runner.running_task_count)
    for _ in range(50):
        runner.add_task(rand_delay())
    # keep the program alive until all the tasks are done
    while runner.running_task_count > 0:
        await asyncio.sleep(0.1)
        
if __name__ == "__main__":
    asyncio.run(main())
        

输出:

Task started Task-2 5
Task started Task-3 5
Task started Task-4 5
Task started Task-5 5
Task started Task-6 5
Task ended Task-6 5
Task started Task-7 5
Task ended Task-4 5
Task ended Task-2 5
Task started Task-8 5
Task started Task-9 5
Task ended Task-5 5
Task started Task-10 5
Task ended Task-3 5
.....
Task started Task-51 5
Task ended Task-48 5
Task ended Task-47 4
Task ended Task-49 3
Task ended Task-51 2
Task ended Task-50 1

协程是 Python 中的第一个 class 个对象。因此,它们可以放入列表和集合中。

所有任务创建都由 RunSome 处理。您将协程传递给它以执行。它知道当前有多少任务正在 运行ning,它决定是立即创建一个新任务还是将协程添加到待处理任务队列中。当一个任务完成时,它会从队列中抓取一个新的协程(如果有的话)。 运行ning 任务的数量永远不会超过传递给构造函数的阈值计数(默认值为 5)。任务是传递的协程的包装器。

您将必须弄清楚如何处理返回值(如果有)。这里的错误处理是基本的,但由于 try:finally: 块,它确实保持了正确数量的 运行ning 任务。