有效地使用多个 Asyncio 队列

Using Multiple Asyncio Queues Effectively

我目前正在构建一个项目,该项目需要向各种端点发出多个请求。我将这些请求包装在 Aiohttp 中以允许异步。

问题:
我有三个队列:queue1queue2queue3。此外,我还有三个辅助功能(worker1worker2worker3),它们与各自的队列相关联。第一个队列会立即填充 运行 之前已知的列表 ID。当请求完成并且数据提交到数据库时,它会将 ID 传递给 queue2worker2 将使用此 ID 并请求更多数据。根据这些数据,它将开始生成一个 ID 列表(不同于 queue1/queue2 中的 ID。worker2 会将 ID 放入 queue3。最后 worker3 将获取此 ID来自 queue3 并在提交到数据库之前请求更多数据。

问题出现的事实是 queue.join() 是一个阻塞调用。每个工作人员都绑定到一个单独的队列,因此 queue1 的连接将阻塞直到完成。这很好,但它也违背了使用异步的目的。如果不使用 join(),程序将无法检测队列何时完全为空。另一个问题是,当其中一个队列为空但仍有数据尚未添加时,可能会出现静默错误。

基本代码大纲如下:

queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()

async with aiohttp.ClientSession() as session:
    for i in range(3):
        tasks.append(asyncio.create_task(worker1(queue1)))

    for i in range(3):
        tasks.append(asyncio.create_task(worker2(queue2)))

    for i in range(10):
        tasks.append(asyncio.create_task(worker3(queue3)))

    for i in IDs:
       queue1.put_nowait(i)

    await asyncio.gather(*tasks)

工作函数处于无限循环中,等待项目进入队列。

当数据全部处理完后,将不会退出,程序将挂起。

有没有办法有效管理worker并妥善结束?

正如 中很好地解释的那样,Queue.join 用于在注入队列的所有工作完成时通知生产者。由于您的第一个队列 不知道 特定项目何时完成(它会成倍增加并分发到其他队列),因此 join 不是适合您的工具。

从您的代码来看,您的工作人员似乎只需要 运行 处理队列的初始项目。如果是这种情况,那么您可以使用关闭哨兵来通知工作人员退出。例如:

async with aiohttp.ClientSession() as session:

    # ... create tasks as above ...

    for i in IDs:
       queue1.put_nowait(i)
    queue1.put_nowait(None)  # no more work

    await asyncio.gather(*tasks)

这类似于您的原始代码,但带有明确的关闭请求。工人必须检测哨兵并做出相应反应:将其传播到下一个 queue/worker 并退出。例如,在 worker1:

while True:
    item = queue1.get()
    if item is None:
        # done with processing, propagate sentinel to worker2 and exit
        await queue2.put(None)
        break
    # ... process item as usual ...

在其他两个 worker 中执行相同操作(worker3 除外,因为没有下一个队列而不会传播)将导致所有三个任务在工作完成后完成。由于队列是先进先出的,工作人员可以在遇到哨兵后安全退出,因为他们知道没有物品被丢弃。显式关闭还将关闭队列与恰好为空的队列区分开来,从而防止工作人员由于临时空队列而过早退出。

直到 Python 3.7,此技术实际上 demonstratedQueue 的文档中,但该示例有点令人困惑 both Queue.join 的使用和关机哨兵的使用。两者是分开的,可以相互独立使用。 (并且将它们一起使用也可能有意义,例如使用 Queue.join 等待“里程碑”,然后将其他东西放入队列,同时保留用于停止工作人员的哨兵。)