如何在异步协程中包装同步函数?
How can I wrap a synchronous function in an async coroutine?
我正在使用 aiohttp 构建一个 API 服务器,该服务器将 TCP 请求发送到单独的服务器。发送 TCP 请求的模块是同步的,对我来说是一个黑盒子。所以我的问题是这些请求阻塞了整个 API。我需要一种方法将模块请求包装在一个不会阻塞 API.
其余部分的异步协程中
所以,仅使用 sleep
作为一个简单的例子,有什么方法可以将耗时的同步代码包装在非阻塞协程中,像这样:
async def sleep_async(delay):
# After calling sleep, loop should be released until sleep is done
yield sleep(delay)
return 'I slept asynchronously'
最终我在this thread. The method I was looking for is run_in_executor中找到了答案。这允许同步函数 运行 异步而不阻塞事件循环。
在我上面发布的 sleep
示例中,它可能看起来像这样:
import asyncio
from time import sleep
async def sleep_async(loop, delay):
# None uses the default executor (ThreadPoolExecutor)
await loop.run_in_executor(None, sleep, delay)
return 'I slept asynchronously'
另请参阅以下答案 ->
您可以使用装饰器将同步版本包装到异步版本。
import time
from functools import wraps, partial
def wrap(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
@wrap
def sleep_async(delay):
time.sleep(delay)
return 'I slept asynchronously'
过时了,aioify是维护模式
或使用 aioify 库
% pip install aioify
然后
@aioify
def sleep_async(delay):
pass
不确定是否为时已晚,但您也可以使用装饰器在线程中执行您的功能。尽管如此,请注意它仍然是非合作阻塞,不像异步是合作阻塞。
def wrap(func):
from concurrent.futures import ThreadPoolExecutor
pool=ThreadPoolExecutor()
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
future=pool.submit(func, *args, **kwargs)
return asyncio.wrap_future(future)
return run
也许有人需要我解决这个问题。我编写了自己的库来解决这个问题,它允许您使用装饰器使任何函数异步。
要安装库,运行 这个命令:
$ pip install awaits
要使您的任何函数异步,只需向其添加 @awaitable 装饰器,如下所示:
import time
import asyncio
from awaits.awaitable import awaitable
@awaitable
def sum(a, b):
# heavy load simulation
time.sleep(10)
return a + b
现在你可以确定你的函数真的是异步协程了:
print(asyncio.run(sum(2, 2)))
“在幕后”您的函数将在线程池中执行。每次调用您的函数时都不会重新创建此线程池。线程池创建一次并通过队列接受新任务。这将使您的程序 运行 比使用其他解决方案更快,因为创建额外的线程是额外的开销。
装饰器对这种情况很有用,运行你在另一个线程中的阻塞函数。
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union
class to_async:
def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
self.executor = executor
def __call__(self, blocking):
@wraps(blocking)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
if not self.executor:
self.executor = ThreadPoolExecutor()
func = partial(blocking, *args, **kwargs)
return await loop.run_in_executor(self.executor,func)
return wrapper
@to_async(executor=None)
def sync(*args, **kwargs):
print(args, kwargs)
asyncio.run(sync("hello", "world", result=True))
从 python 3.9 开始,最干净的方法是使用 asyncio.to_thread 方法,这基本上是 run_in_executor
的快捷方式,但保留所有上下文变量。
此外,请考虑 GIL,因为它是一个 to_thread。您仍然可以 运行 CPU-bound 任务来完成 numpy
之类的任务。来自文档:
Note Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don’t have one, asyncio.to_thread() can also be used for CPU-bound functions.
我正在使用 aiohttp 构建一个 API 服务器,该服务器将 TCP 请求发送到单独的服务器。发送 TCP 请求的模块是同步的,对我来说是一个黑盒子。所以我的问题是这些请求阻塞了整个 API。我需要一种方法将模块请求包装在一个不会阻塞 API.
其余部分的异步协程中所以,仅使用 sleep
作为一个简单的例子,有什么方法可以将耗时的同步代码包装在非阻塞协程中,像这样:
async def sleep_async(delay):
# After calling sleep, loop should be released until sleep is done
yield sleep(delay)
return 'I slept asynchronously'
最终我在this thread. The method I was looking for is run_in_executor中找到了答案。这允许同步函数 运行 异步而不阻塞事件循环。
在我上面发布的 sleep
示例中,它可能看起来像这样:
import asyncio
from time import sleep
async def sleep_async(loop, delay):
# None uses the default executor (ThreadPoolExecutor)
await loop.run_in_executor(None, sleep, delay)
return 'I slept asynchronously'
另请参阅以下答案 ->
您可以使用装饰器将同步版本包装到异步版本。
import time
from functools import wraps, partial
def wrap(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
@wrap
def sleep_async(delay):
time.sleep(delay)
return 'I slept asynchronously'
过时了,aioify是维护模式
或使用 aioify 库
% pip install aioify
然后
@aioify
def sleep_async(delay):
pass
不确定是否为时已晚,但您也可以使用装饰器在线程中执行您的功能。尽管如此,请注意它仍然是非合作阻塞,不像异步是合作阻塞。
def wrap(func):
from concurrent.futures import ThreadPoolExecutor
pool=ThreadPoolExecutor()
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
future=pool.submit(func, *args, **kwargs)
return asyncio.wrap_future(future)
return run
也许有人需要我解决这个问题。我编写了自己的库来解决这个问题,它允许您使用装饰器使任何函数异步。
要安装库,运行 这个命令:
$ pip install awaits
要使您的任何函数异步,只需向其添加 @awaitable 装饰器,如下所示:
import time
import asyncio
from awaits.awaitable import awaitable
@awaitable
def sum(a, b):
# heavy load simulation
time.sleep(10)
return a + b
现在你可以确定你的函数真的是异步协程了:
print(asyncio.run(sum(2, 2)))
“在幕后”您的函数将在线程池中执行。每次调用您的函数时都不会重新创建此线程池。线程池创建一次并通过队列接受新任务。这将使您的程序 运行 比使用其他解决方案更快,因为创建额外的线程是额外的开销。
装饰器对这种情况很有用,运行你在另一个线程中的阻塞函数。
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union
class to_async:
def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
self.executor = executor
def __call__(self, blocking):
@wraps(blocking)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
if not self.executor:
self.executor = ThreadPoolExecutor()
func = partial(blocking, *args, **kwargs)
return await loop.run_in_executor(self.executor,func)
return wrapper
@to_async(executor=None)
def sync(*args, **kwargs):
print(args, kwargs)
asyncio.run(sync("hello", "world", result=True))
从 python 3.9 开始,最干净的方法是使用 asyncio.to_thread 方法,这基本上是 run_in_executor
的快捷方式,但保留所有上下文变量。
此外,请考虑 GIL,因为它是一个 to_thread。您仍然可以 运行 CPU-bound 任务来完成 numpy
之类的任务。来自文档:
Note Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don’t have one, asyncio.to_thread() can also be used for CPU-bound functions.