PIL 和使用 asyncio 的阻塞调用

PIL and blocking calls with asyncio

我有一个异步应用程序,它使用来自 aiohttp 的服务器和带有 asyncio.open_connection()

的异步套接字

我的代码包含一些来自 PIL 库的阻塞调用,例如

Image.save()
Image.resize()
  1. 即使调用没有阻塞太长时间,如果我使用这些阻塞调用,我的 Web 服务器是否会冻结?更准确地说,事件循环是否有可能因为阻塞代码而错过事件?
  2. 如果是,这些与 asyncio 集成的功能的替代品是什么? PIL 没有异步版本。
  3. 一般来说,asyncio 中的 'blocking code' 是什么?除了套接字、读取文件等明显的操作
    例如, os.path.join() 是否被认为可以?如何处理 numpy 数组?

can my web server freeze if I use these blocking calls? More precisely, is it possible that the event loop will miss events because of blocking code?

服务器在执行图像函数时会精确卡顿。您不会错过任何事件,但所有事件处理都会在图像函数执行时延迟。

冻结事件循环是一种糟糕的情况 - 你应该避免它。

If yes, what is the replacement for these functions, that integrate with asyncio? there is no asyncio version of PIL.

避免冻结事件循环的最简单和通用的方法 - 使用 asyncio.run_in_executor 在另一个线程或另一个进程中执行阻塞函数。那里的代码片段展示了如何做到这一点,并很好地解释了何时使用进程或线程:

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

我只想补充一点,进程池可能并不总是适合每个 CPU 绑定操作的好解决方案。如果您的图像函数不需要太多时间(或者特别是如果您的服务器没有多个处理器内核),那么在一个线程中 运行 它们可能仍然更有效率。

In general, what is considered a 'blocking code' in asyncio? besides the obvious operations like socket, read a file, etc. For example, does os.path.join() is considered ok? what about working on a numpy array?

粗略地说任何函数都是阻塞的:它会阻塞事件循环一段时间。但是像 os.path.join 这样的许多功能只需要很少的时间,所以它们不是问题,我们不称它们为“阻塞”。

很难说出执行时间(和事件循环冻结)成为问题时的确切限制,特别是考虑到不同硬件的时间会有所不同。我有偏见的建议 - 如果您的代码在将控制权返回到事件循环之前需要(或可能需要)> 50 毫秒,请考虑它阻塞并使用 run_in_executor.

更新:

Thanks, does it make sense to use one event loop (of the main thread), and using another thread that will add tasks using the same loop?

我不确定你在这里的意思,但我认为不是。我们需要另一个线程来 运行 一些工作,而不是在那里添加任务。

I need some way for the thread to inform the main thread after the image processing is completed`

只需等待 run_in_executor 的结果或使用它开始任务。 run_in_executor - 是一个在后台线程中执行某些操作而不会阻塞事件循环的协程。

它看起来像这样:

thread_pool = ThreadPoolExecutor()


def process_image(img):
    # all stuff to process image here
    img.save()
    img.resize()


async def async_image_process(img):
    await loop.run_in_executor(
        thread_pool, 
        partial(process_image, img)
    )


async def handler(request):

    asyncio.create_task(
        async_image_process(img)
    )
    # we use a task to return the response immediately,
    # read 

    return web.Response(text="Image processed without blocking other requests")