
How can I wrap a synchronous function in an async coroutine?

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'

最终我在this thread. The method I was looking for is run_in_executor中找到了答案。这允许同步函数 运行 异步而不阻塞事件循环。

在我上面发布的 sleep 示例中,它可能看起来像这样:

import asyncio
from time import sleep

async def sleep_async(loop, delay):
    # None uses the default executor (ThreadPoolExecutor)
    await loop.run_in_executor(None, sleep, delay)
    return 'I slept asynchronously'

另请参阅以下答案 ->


import time
from functools import wraps, partial

def wrap(func):
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

def sleep_async(delay):
    return 'I slept asynchronously'


或使用 aioify

% pip install aioify


def sleep_async(delay):


def wrap(func):
    from concurrent.futures import ThreadPoolExecutor
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        future=pool.submit(func, *args, **kwargs)
        return asyncio.wrap_future(future)
    return run


要安装库,运行 这个命令:

$ pip install awaits

要使您的任何函数异步,只需向其添加 @awaitable 装饰器,如下所示:

import time
import asyncio
from awaits.awaitable import awaitable

def sum(a, b):
  # heavy load simulation
  return a + b


print(asyncio.run(sum(2, 2)))

“在幕后”您的函数将在线程池中执行。每次调用您的函数时都不会重新创建此线程池。线程池创建一次并通过队列接受新任务。这将使您的程序 运行 比使用其他解决方案更快,因为创建额外的线程是额外的开销。


import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union

class to_async:

    def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
        self.executor =  executor
    def __call__(self, blocking):
        async def wrapper(*args, **kwargs):

            loop = asyncio.get_event_loop()
            if not self.executor:
                self.executor = ThreadPoolExecutor()

            func = partial(blocking, *args, **kwargs)
            return await loop.run_in_executor(self.executor,func)

        return wrapper

def sync(*args, **kwargs):
    print(args, kwargs)
asyncio.run(sync("hello", "world", result=True))

从 python 3.9 开始,最干净的方法是使用 asyncio.to_thread 方法,这基本上是 run_in_executor 的快捷方式,但保留所有上下文变量。

此外,请考虑 GIL,因为它是一个 to_thread。您仍然可以 运行 CPU-bound 任务来完成 numpy 之类的任务。来自文档:

Note Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don’t have one, asyncio.to_thread() can also be used for CPU-bound functions.