asyncio/aiohttp - create_task() 阻塞事件循环,在 "This event loop is already running " 中收集结果
asyncio/aiohttp - create_task() blocks event loop, gather results in "This event loop is already running "
我无法同时获得我的消费者和生产者 运行,似乎 worker() 或 aiohttp 服务器正在阻塞 - 即使与 asyncio.gather()[ 同时执行也是如此=15=]
如果我改为 loop.create_task(worker),这将阻塞并且服务器将永远不会启动。
我已经尝试了所有我能想到的变体,包括 nest_asyncio 模块 - 但我只能得到两个组件之一 运行.
我做错了什么?
async def worker():
batch_size = 30
print("running worker")
while True:
if queue.qsize() > 0:
future_map = {}
size = min(queue.qsize(), batch_size)
batch = []
for _ in range(size):
item = await queue.get()
print("Item: "+str(item))
future_map[item["fname"]] = item["future"]
batch.append(item)
print("processing", batch)
results = await process_files(batch)
for dic in results:
for key, value in dic.items():
print(str(key)+":"+str(value))
future_map[key].set_result(value)
# mark the tasks done
for _ in batch:
queue.task_done()
def start_worker():
loop.create_task(worker())
def create_app():
app = web.Application()
routes = web.RouteTableDef()
@routes.post("/decode")
async def handle_post(request):
return await decode(request)
app.add_routes(routes)
app.on_startup.append(start_worker())
return app
if __name__ == '__main__':
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
app = create_app()
web.run_app(app)
上面打印了“运行 worker”并且没有启动 AIOHTTP 服务器。
def run(loop, app, port=8001):
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', port)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
def main(app):
asyncio.gather(run(loop, app), worker())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
app = create_app()
main(app)
以上启动了服务器,但没有启动 worker。
虽然 await asyncio.sleep(0)
解决了眼前的问题,但这并不是一个理想的解决方案;事实上,它有点像 anti-pattern。要了解原因,让我们更详细地检查问题发生的原因。问题的核心是工作人员的 while
循环 - 一旦队列变空,它实际上归结为:
while True:
pass
当然,标记为 pass
的部分包含对 qsize()
的检查,如果队列为 non-empty,则导致执行附加代码,但是一旦 qsize()
首次到达0,该检查将始终评估为 false。这是因为 asyncio 是 single-threaded 而当 qsize() == 0
时,while
循环不再遇到单个 await
。如果没有 await
,就不可能放弃对可能填充队列的协程或回调的控制,并且 while
循环变得无限。
这就是循环内 await asyncio.sleep(0)
有帮助的原因:它强制进行上下文切换,确保其他协程将有机会进入 运行 并最终进入 re-populate 队列。然而,它也使 while
循环持续 运行ning,这意味着事件循环永远不会进入休眠状态,即使队列连续数小时保持空。只要 worker 处于活动状态,事件循环就会保持 busy-waiting 状态。您可以按照 dirn 的建议,通过将睡眠间隔调整为 non-zero 值来缓解 busy-wait ,但这会引入延迟,并且在没有 activity.
正确的解决方法是 不是 检查 qsize()
,而是使用 queue.get()
来获取下一项。它会根据需要休眠直到项目出现,并在它出现后立即唤醒协程。不要担心这会“阻塞”工作人员——这正是 asyncio 的要点,您可以拥有多个协程,而一个在 await 上被“阻塞”的协程只会让其他协程继续进行。例如:
async def worker():
batch_size = 30
while True:
# wait for an item and add it to the batch
batch = [await queue.get()]
# batch up more items if available
while not queue.empty() and len(batch) < batch_size:
batch.append(await queue.get())
# process the batch
future_map = {item["fname"]: item["future"] for item in batch}
results = await process_files(batch)
for dic in results:
for key, value in dic.items():
print(str(key)+":"+str(value))
future_map[key].set_result(value)
for _ in batch:
queue.task_done()
在这个变体中,我们在循环的每次迭代中等待一些东西,不需要休眠。
我无法同时获得我的消费者和生产者 运行,似乎 worker() 或 aiohttp 服务器正在阻塞 - 即使与 asyncio.gather()[ 同时执行也是如此=15=]
如果我改为 loop.create_task(worker),这将阻塞并且服务器将永远不会启动。
我已经尝试了所有我能想到的变体,包括 nest_asyncio 模块 - 但我只能得到两个组件之一 运行.
我做错了什么?
async def worker():
batch_size = 30
print("running worker")
while True:
if queue.qsize() > 0:
future_map = {}
size = min(queue.qsize(), batch_size)
batch = []
for _ in range(size):
item = await queue.get()
print("Item: "+str(item))
future_map[item["fname"]] = item["future"]
batch.append(item)
print("processing", batch)
results = await process_files(batch)
for dic in results:
for key, value in dic.items():
print(str(key)+":"+str(value))
future_map[key].set_result(value)
# mark the tasks done
for _ in batch:
queue.task_done()
def start_worker():
loop.create_task(worker())
def create_app():
app = web.Application()
routes = web.RouteTableDef()
@routes.post("/decode")
async def handle_post(request):
return await decode(request)
app.add_routes(routes)
app.on_startup.append(start_worker())
return app
if __name__ == '__main__':
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
app = create_app()
web.run_app(app)
上面打印了“运行 worker”并且没有启动 AIOHTTP 服务器。
def run(loop, app, port=8001):
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', port)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
def main(app):
asyncio.gather(run(loop, app), worker())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
app = create_app()
main(app)
以上启动了服务器,但没有启动 worker。
虽然 await asyncio.sleep(0)
解决了眼前的问题,但这并不是一个理想的解决方案;事实上,它有点像 anti-pattern。要了解原因,让我们更详细地检查问题发生的原因。问题的核心是工作人员的 while
循环 - 一旦队列变空,它实际上归结为:
while True:
pass
当然,标记为 pass
的部分包含对 qsize()
的检查,如果队列为 non-empty,则导致执行附加代码,但是一旦 qsize()
首次到达0,该检查将始终评估为 false。这是因为 asyncio 是 single-threaded 而当 qsize() == 0
时,while
循环不再遇到单个 await
。如果没有 await
,就不可能放弃对可能填充队列的协程或回调的控制,并且 while
循环变得无限。
这就是循环内 await asyncio.sleep(0)
有帮助的原因:它强制进行上下文切换,确保其他协程将有机会进入 运行 并最终进入 re-populate 队列。然而,它也使 while
循环持续 运行ning,这意味着事件循环永远不会进入休眠状态,即使队列连续数小时保持空。只要 worker 处于活动状态,事件循环就会保持 busy-waiting 状态。您可以按照 dirn 的建议,通过将睡眠间隔调整为 non-zero 值来缓解 busy-wait ,但这会引入延迟,并且在没有 activity.
正确的解决方法是 不是 检查 qsize()
,而是使用 queue.get()
来获取下一项。它会根据需要休眠直到项目出现,并在它出现后立即唤醒协程。不要担心这会“阻塞”工作人员——这正是 asyncio 的要点,您可以拥有多个协程,而一个在 await 上被“阻塞”的协程只会让其他协程继续进行。例如:
async def worker():
batch_size = 30
while True:
# wait for an item and add it to the batch
batch = [await queue.get()]
# batch up more items if available
while not queue.empty() and len(batch) < batch_size:
batch.append(await queue.get())
# process the batch
future_map = {item["fname"]: item["future"] for item in batch}
results = await process_files(batch)
for dic in results:
for key, value in dic.items():
print(str(key)+":"+str(value))
future_map[key].set_result(value)
for _ in batch:
queue.task_done()
在这个变体中,我们在循环的每次迭代中等待一些东西,不需要休眠。