在 asyncio 循环中调用 run_in_executor() 以阻止代码的正确方法

Correct way to call run_in_executor() for blocking code in asyncio loop

在以下代码 (CPython 3.9) 中,我有一个正在侦听消息的异步客户端。收到每条消息后,如果满足某些条件,它会调用一个阻塞的 CPU 密集函数 gen_block.

问题 1:在下面的示例中,gen_block 阻塞了循环(即阻止 read_msg 读取更多消息),尽管被调用来自 run_in_executor()...我在这里缺少什么?

问题2:一旦调用了gen_block,我不想在原来的调用完成之前再次调用它。在 asyncio 框架中处理该状态的正确方法是什么?

async def main(args):
    reader, writer = await asyncio.open_connection(args.host, args.port
    try:
        while data := await read_msg(reader):
            handle_data(data, writer)
            if len(TXN_QUEUE) >= 3:
                await loop.run_in_executor(None, gen_block, writer)
    
    # except, finally, shutdown etc

指向引用/SO 帖子的指针也可以;对于这种从循环中调用 run_in_executor 的特定场景,我找不到相关答案。

当您说 await loop.run_in_executor(None, gen_block, writer) 时,await 表示等待 loop_in_executor 完成其 CPU 繁重的任务,因此与直接调用 gen_block 没有任何不同在循环中,它将阻塞直到完成。您可以删除 await 关键字并跟踪列表中的任务,这样应该可以阻止阻塞,因为这已经在一个单独的任务中:

async def main(args):
    tasks = []
    reader, writer = await asyncio.open_connection(args.host, args.port
    try:
        while data := await read_msg(reader):
            handle_data(data, writer)
            if len(TXN_QUEUE) >= 3:
                tasks.append(loop.run_in_executor(None, gen_block, writer))

# do something with the tasks list later, like asyncio.gather

但是,鉴于您要求在第二次调用之前等待 gen_block 完成,队列可能是一个不错的方法。当您准备好进行 CPU 密集型工作时,您可以将完成这项工作所需的信息放入队列中。然后,您有一个工作任务 运行 您的 CPU 密集工作,它从队列中读取要完成的工作并 运行 在单独的线程中执行。由于只有一名工作人员,您一次只能 运行 一次调用您的 CPU 密集工作,而其他请求在其后排队。

这里有一个简单的例子来说明这个想法。我们在本地主机上创建一个服务器,它接受简单的文本消息,然后将它们放入队列中。然后,我们从队列中拉出一名工作人员,并在单独的线程中进行 运行 CPU 密集工作。请注意,我在这里使用 asyncio.to_thread,它是在 3.9 中引入的,并为您抽象出管理执行程序。

import asyncio
import functools
from asyncio import Queue


def cpu_intensive(data: str):
    print(f"Running intense work for {data}")
    i = 0
    while i < 100000000:
        i = i + 1
    print(f"Finished intense work for {data}")


async def worker(queue: Queue):
    while True:
        work_item = await queue.get()
        await asyncio.to_thread(cpu_intensive, work_item)


async def handle_connect(queue: Queue, reader, writer):
    while data := await reader.readline():
        print(f"Queueing {data}")
        queue.put_nowait(data)


async def main():
    queue = Queue()
    asyncio.create_task(worker(queue))
    server = await asyncio.start_server(functools.partial(handle_connect, queue), '127.0.0.1', 8000)

    async with server:
        await server.serve_forever()


asyncio.run(main())

运行 将 a、b、c 作为三个消息依次发送到您的服务器,您将看到类似于以下内容的输出:

Queueing b'a\r\n'
Running intense work for b'a\r\n'
Queueing b'b\r\n'
Queueing b'c\r\n'
Finished intense work for b'a\r\n'
Running intense work for b'b\r\n'
Finished intense work for b'b\r\n'
Running intense work for b'c\r\n'