如何在 python 中实现异步工作队列?
how to implement asyncio worker queue in python?
我需要为 aiohttp 创建工作队列。
我现在正在使用 asyncio.gather,但它的工作方式不对:
这就是我想要做的:
第一个可以用下面的代码实现:
async def some_stuff(_):
pass
tasks = []
for i in data:
tasks.append(do_stuff(i))
asyncio.run(asyncio.gather(*tasks))
我需要
的例子
据我了解,您希望 运行 正好并行执行 5 个任务。当其中一项任务完成时,您想立即开始一项新任务。为此,asyncio.gather
不起作用,因为它会等待所有任务完成后再继续。
我的建议如下:
from collections import deque
import random
import asyncio
class RunSome:
def __init__(self, task_count=5):
self.task_count = task_count
self.running = set()
self.waiting = deque()
@property
def running_task_count(self):
return len(self.running)
def add_task(self, coro):
if len(self.running) >= self.task_count:
self.waiting.append(coro)
else:
self._start_task(coro)
def _start_task(self, coro):
self.running.add(coro)
asyncio.create_task(self._task(coro))
async def _task(self, coro):
try:
return await coro
finally:
self.running.remove(coro)
if self.waiting:
coro2 = self.waiting.popleft()
self._start_task(coro2)
async def main():
runner = RunSome()
async def rand_delay():
rnd = random.random() + 0.5
print("Task started", asyncio.current_task().get_name(),
runner.running_task_count)
await asyncio.sleep(rnd)
print("Task ended", asyncio.current_task().get_name(),
runner.running_task_count)
for _ in range(50):
runner.add_task(rand_delay())
# keep the program alive until all the tasks are done
while runner.running_task_count > 0:
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
输出:
Task started Task-2 5
Task started Task-3 5
Task started Task-4 5
Task started Task-5 5
Task started Task-6 5
Task ended Task-6 5
Task started Task-7 5
Task ended Task-4 5
Task ended Task-2 5
Task started Task-8 5
Task started Task-9 5
Task ended Task-5 5
Task started Task-10 5
Task ended Task-3 5
.....
Task started Task-51 5
Task ended Task-48 5
Task ended Task-47 4
Task ended Task-49 3
Task ended Task-51 2
Task ended Task-50 1
协程是 Python 中的第一个 class 个对象。因此,它们可以放入列表和集合中。
所有任务创建都由 RunSome
处理。您将协程传递给它以执行。它知道当前有多少任务正在 运行ning,它决定是立即创建一个新任务还是将协程添加到待处理任务队列中。当一个任务完成时,它会从队列中抓取一个新的协程(如果有的话)。 运行ning 任务的数量永远不会超过传递给构造函数的阈值计数(默认值为 5)。任务是传递的协程的包装器。
您将必须弄清楚如何处理返回值(如果有)。这里的错误处理是基本的,但由于 try:finally: 块,它确实保持了正确数量的 运行ning 任务。
我需要为 aiohttp 创建工作队列。
我现在正在使用 asyncio.gather,但它的工作方式不对:
这就是我想要做的:
第一个可以用下面的代码实现:
async def some_stuff(_):
pass
tasks = []
for i in data:
tasks.append(do_stuff(i))
asyncio.run(asyncio.gather(*tasks))
我需要
的例子据我了解,您希望 运行 正好并行执行 5 个任务。当其中一项任务完成时,您想立即开始一项新任务。为此,asyncio.gather
不起作用,因为它会等待所有任务完成后再继续。
我的建议如下:
from collections import deque
import random
import asyncio
class RunSome:
def __init__(self, task_count=5):
self.task_count = task_count
self.running = set()
self.waiting = deque()
@property
def running_task_count(self):
return len(self.running)
def add_task(self, coro):
if len(self.running) >= self.task_count:
self.waiting.append(coro)
else:
self._start_task(coro)
def _start_task(self, coro):
self.running.add(coro)
asyncio.create_task(self._task(coro))
async def _task(self, coro):
try:
return await coro
finally:
self.running.remove(coro)
if self.waiting:
coro2 = self.waiting.popleft()
self._start_task(coro2)
async def main():
runner = RunSome()
async def rand_delay():
rnd = random.random() + 0.5
print("Task started", asyncio.current_task().get_name(),
runner.running_task_count)
await asyncio.sleep(rnd)
print("Task ended", asyncio.current_task().get_name(),
runner.running_task_count)
for _ in range(50):
runner.add_task(rand_delay())
# keep the program alive until all the tasks are done
while runner.running_task_count > 0:
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
输出:
Task started Task-2 5
Task started Task-3 5
Task started Task-4 5
Task started Task-5 5
Task started Task-6 5
Task ended Task-6 5
Task started Task-7 5
Task ended Task-4 5
Task ended Task-2 5
Task started Task-8 5
Task started Task-9 5
Task ended Task-5 5
Task started Task-10 5
Task ended Task-3 5
.....
Task started Task-51 5
Task ended Task-48 5
Task ended Task-47 4
Task ended Task-49 3
Task ended Task-51 2
Task ended Task-50 1
协程是 Python 中的第一个 class 个对象。因此,它们可以放入列表和集合中。
所有任务创建都由 RunSome
处理。您将协程传递给它以执行。它知道当前有多少任务正在 运行ning,它决定是立即创建一个新任务还是将协程添加到待处理任务队列中。当一个任务完成时,它会从队列中抓取一个新的协程(如果有的话)。 运行ning 任务的数量永远不会超过传递给构造函数的阈值计数(默认值为 5)。任务是传递的协程的包装器。
您将必须弄清楚如何处理返回值(如果有)。这里的错误处理是基本的,但由于 try:finally: 块,它确实保持了正确数量的 运行ning 任务。