如何使用 Python asyncio 在 asyncpg API 上建立同步外观?
How can I have a synchronous facade over asyncpg APIs with Python asyncio?
想象一个异步 aiohttp
Web 应用程序,该应用程序由通过 asyncpg
连接的 Postgresql 数据库支持并且不执行其他 I/O。如何让中间层托管应用程序逻辑,即 not 异步? (我知道我可以简单地让所有东西都异步——但想象一下我的应用程序有 大量 应用程序逻辑,只受数据库 I/O 的约束,我无法触及它的所有内容)。
伪代码:
async def handler(request):
# call into layers over layers of application code, that simply emits SQL
...
def application_logic():
...
# This doesn't work, obviously, as await is a syntax
# error inside synchronous code.
data = await asyncpg_conn.execute("SQL")
...
# What I want is this:
data = asyncpg_facade.execute("SQL")
...
如何在 asyncpg
上构建一个允许应用程序逻辑调用数据库的同步外观?在这种情况下,使用 async.run()
或 asyncio.run_coroutine_threadsafe()
等浮动的方法不起作用,因为我们来自一个已经异步的上下文。我认为这不可能是不可能的,因为原则上已经有一个事件循环 运行 asyncpg
协程。
附加问题:使 await
inside sync 成为语法错误的设计原理是什么?允许 await
来自协程的任何上下文不是很有用吗,这样我们就有了简单的方法来分解功能构建块中的应用程序?
编辑 额外奖励:超出 ,留在“安全区”内,我会对避免阻塞主线程的解决方案感兴趣(领先到更 gevent-ish 的东西)。另请参阅我对保罗的回答的评论...
您需要在 运行 您的异步代码中创建一个辅助线程。你用它自己的事件循环初始化辅助线程,它永远 运行s。通过调用 run_coroutine_threadsafe() 并在 returned 对象上调用 result() 来执行每个异步函数。这是 concurrent.futures.Future 的一个实例,它的 result() 方法不会 return 直到协程的结果从辅助线程准备好。
然后,您的主线程实际上会像调用同步函数一样调用每个异步函数。在每个函数调用完成之前,主线程不会继续。顺便说一句,你的同步功能是否真的 运行ning 在事件循环上下文中并不重要。
对 result() 的调用当然会阻塞主线程的事件循环。如果您想从同步代码中获得 运行ning 异步函数的效果,这是无法避免的。
不用说,这是一件很丑陋的事情,它暗示了错误的程序结构。但是您正在尝试转换遗留程序,这可能会有所帮助。
import asyncio
import threading
from datetime import datetime
def main():
def thr(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
loop = asyncio.new_event_loop()
t = threading.Thread(target=thr, args=(loop, ), daemon=True)
t.start()
print("Hello", datetime.now())
t1 = asyncio.run_coroutine_threadsafe(f1(1.0), loop).result()
t2 = asyncio.run_coroutine_threadsafe(f1(2.0), loop).result()
print(t1, t2)
if __name__ == "__main__":
main()
>>> Hello 2021-10-26 20:37:00.454577
>>> Hello 1.0 2021-10-26 20:37:01.464127
>>> Hello 2.0 2021-10-26 20:37:03.468691
>>> 1.0 2.0
想象一个异步 aiohttp
Web 应用程序,该应用程序由通过 asyncpg
连接的 Postgresql 数据库支持并且不执行其他 I/O。如何让中间层托管应用程序逻辑,即 not 异步? (我知道我可以简单地让所有东西都异步——但想象一下我的应用程序有 大量 应用程序逻辑,只受数据库 I/O 的约束,我无法触及它的所有内容)。
伪代码:
async def handler(request):
# call into layers over layers of application code, that simply emits SQL
...
def application_logic():
...
# This doesn't work, obviously, as await is a syntax
# error inside synchronous code.
data = await asyncpg_conn.execute("SQL")
...
# What I want is this:
data = asyncpg_facade.execute("SQL")
...
如何在 asyncpg
上构建一个允许应用程序逻辑调用数据库的同步外观?在这种情况下,使用 async.run()
或 asyncio.run_coroutine_threadsafe()
等浮动的方法不起作用,因为我们来自一个已经异步的上下文。我认为这不可能是不可能的,因为原则上已经有一个事件循环 运行 asyncpg
协程。
附加问题:使 await
inside sync 成为语法错误的设计原理是什么?允许 await
来自协程的任何上下文不是很有用吗,这样我们就有了简单的方法来分解功能构建块中的应用程序?
编辑 额外奖励:超出
您需要在 运行 您的异步代码中创建一个辅助线程。你用它自己的事件循环初始化辅助线程,它永远 运行s。通过调用 run_coroutine_threadsafe() 并在 returned 对象上调用 result() 来执行每个异步函数。这是 concurrent.futures.Future 的一个实例,它的 result() 方法不会 return 直到协程的结果从辅助线程准备好。
然后,您的主线程实际上会像调用同步函数一样调用每个异步函数。在每个函数调用完成之前,主线程不会继续。顺便说一句,你的同步功能是否真的 运行ning 在事件循环上下文中并不重要。
对 result() 的调用当然会阻塞主线程的事件循环。如果您想从同步代码中获得 运行ning 异步函数的效果,这是无法避免的。
不用说,这是一件很丑陋的事情,它暗示了错误的程序结构。但是您正在尝试转换遗留程序,这可能会有所帮助。
import asyncio
import threading
from datetime import datetime
def main():
def thr(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
loop = asyncio.new_event_loop()
t = threading.Thread(target=thr, args=(loop, ), daemon=True)
t.start()
print("Hello", datetime.now())
t1 = asyncio.run_coroutine_threadsafe(f1(1.0), loop).result()
t2 = asyncio.run_coroutine_threadsafe(f1(2.0), loop).result()
print(t1, t2)
if __name__ == "__main__":
main()
>>> Hello 2021-10-26 20:37:00.454577
>>> Hello 1.0 2021-10-26 20:37:01.464127
>>> Hello 2.0 2021-10-26 20:37:03.468691
>>> 1.0 2.0