asyncio/aiohttp - create_task() 阻塞事件循环,在 "This event loop is already running " 中收集结果

asyncio/aiohttp - create_task() blocks event loop, gather results in "This event loop is already running "

我无法同时获得我的消费者和生产者 运行,似乎 worker() 或 aiohttp 服务器正在阻塞 - 即使与 asyncio.gather()[ 同时执行也是如此=15=]

如果我改为 loop.create_task(worker),这将阻塞并且服务器将永远不会启动。

我已经尝试了所有我能想到的变体,包括 nest_asyncio 模块 - 但我只能得到两个组件之一 运行.

我做错了什么?

async def worker():
    batch_size = 30

    print("running worker")
    while True:
        if queue.qsize() > 0:
            future_map = {}

            size = min(queue.qsize(), batch_size)
            batch = []
            for _ in range(size):
                item = await queue.get()
                print("Item: "+str(item))
                future_map[item["fname"]] = item["future"]
                batch.append(item)

            print("processing", batch)
            results = await process_files(batch)
            for dic in results:
                for key, value in dic.items():
                    print(str(key)+":"+str(value))
                    future_map[key].set_result(value)

            # mark the tasks done
            for _ in batch:
                queue.task_done()



def start_worker():
    loop.create_task(worker())

def create_app():
    app = web.Application()
    routes = web.RouteTableDef()
    @routes.post("/decode")
    async def handle_post(request):
        return await decode(request)
    app.add_routes(routes)
    app.on_startup.append(start_worker())
    return app

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    app = create_app()
    web.run_app(app)

上面打印了“运行 worker”并且没有启动 AIOHTTP 服务器。

def run(loop, app, port=8001):
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', port)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.run_until_complete(handler.finish_connections(1.0))
    srv.close()
    loop.run_until_complete(srv.wait_closed())
    loop.run_until_complete(app.finish())
loop.close()

def main(app):
    asyncio.gather(run(loop, app), worker())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    app = create_app()
    main(app)

以上启动了服务器,但没有启动 worker。

虽然 await asyncio.sleep(0) 解决了眼前的问题,但这并不是一个理想的解决方案;事实上,它有点像 anti-pattern。要了解原因,让我们更详细地检查问题发生的原因。问题的核心是工作人员的 while 循环 - 一旦队列变空,它实际上归结为:

while True:
    pass

当然,标记为 pass 的部分包含对 qsize() 的检查,如果队列为 non-empty,则导致执行附加代码,但是一旦 qsize() 首次到达0,该检查将始终评估为 false。这是因为 asyncio 是 single-threaded 而当 qsize() == 0 时,while 循环不再遇到单个 await。如果没有 await,就不可能放弃对可能填充队列的协程或回调的控制,并且 while 循环变得无限。

这就是循环内 await asyncio.sleep(0) 有帮助的原因:它强制进行上下文切换,确保其他协程将有机会进入 运行 并最终进入 re-populate 队列。然而,它也使 while 循环持续 运行ning,这意味着事件循环永远不会进入休眠状态,即使队列连续数小时保持空。只要 worker 处于活动状态,事件循环就会保持 busy-waiting 状态。您可以按照 dirn 的建议,通过将睡眠间隔调整为 non-zero 值来缓解 busy-wait ,但这会引入延迟,并且在没有 activity.

正确的解决方法是 不是 检查 qsize(),而是使用 queue.get() 来获取下一项。它会根据需要休眠直到项目出现,并在它出现后立即唤醒协程。不要担心这会“阻塞”工作人员——这正是 asyncio 的要点,您可以拥有多个协程,而一个在 await 上被“阻塞”的协程只会让其他协程继续进行。例如:

async def worker():
    batch_size = 30

    while True:
        # wait for an item and add it to the batch
        batch = [await queue.get()]
        # batch up more items if available
        while not queue.empty() and len(batch) < batch_size:
            batch.append(await queue.get())
        # process the batch
        future_map = {item["fname"]: item["future"] for item in batch}
        results = await process_files(batch)
        for dic in results:
            for key, value in dic.items():
                print(str(key)+":"+str(value))
                future_map[key].set_result(value)
        for _ in batch:
            queue.task_done()

在这个变体中,我们在循环的每次迭代中等待一些东西,不需要休眠。