如何使用 Python asyncio 在 asyncpg API 上建立同步外观?

How can I have a synchronous facade over asyncpg APIs with Python asyncio?

想象一个异步 aiohttp Web 应用程序,该应用程序由通过 asyncpg 连接的 Postgresql 数据库支持并且不执行其他 I/O。如何让中间层托管应用程序逻辑,即 not 异步? (我知道我可以简单地让所有东西都异步——但想象一下我的应用程序有 大量 应用程序逻辑,只受数据库 I/O 的约束,我无法触及它的所有内容)。

伪代码:

async def handler(request):
    # call into layers over layers of application code, that simply emits SQL
    ...

def application_logic():
    ...
    # This doesn't work, obviously, as await is a syntax
    # error inside synchronous code.
    data = await asyncpg_conn.execute("SQL")
    ...
    # What I want is this:
    data = asyncpg_facade.execute("SQL")
    ...

如何在 asyncpg 上构建一个允许应用程序逻辑调用数据库的同步外观?在这种情况下,使用 async.run()asyncio.run_coroutine_threadsafe() 等浮动的方法不起作用,因为我们来自一个已经异步的上下文。我认为这不可能是不可能的,因为原则上已经有一个事件循环 运行 asyncpg 协程。

附加问题:使 await inside sync 成为语法错误的设计原理是什么?允许 await 来自协程的任何上下文不是很有用吗,这样我们就有了简单的方法来分解功能构建块中的应用程序?

编辑 额外奖励:超出 ,留在“安全区”内,我会对避免阻塞主线程的解决方案感兴趣(领先到更 gevent-ish 的东西)。另请参阅我对保罗的回答的评论...

您需要在 运行 您的异步代码中创建一个辅助线程。你用它自己的事件循环初始化辅助线程,它永远 运行s。通过调用 run_coroutine_threadsafe() 并在 returned 对象上调用 result() 来执行每个异步函数。这是 concurrent.futures.Future 的一个实例,它的 result() 方法不会 return 直到协程的结果从辅助线程准备好。

然后,您的主线程实际上会像调用同步函数一样调用每个异步函数。在每个函数调用完成之前,主线程不会继续。顺便说一句,你的同步功能是否真的 运行ning 在事件循环上下文中并不重要。

对 result() 的调用当然会阻塞主线程的事件循环。如果您想从同步代码中获得 运行ning 异步函数的效果,这是无法避免的。

不用说,这是一件很丑陋的事情,它暗示了错误的程序结构。但是您正在尝试转换遗留程序,这可能会有所帮助。

import asyncio
import threading
from datetime import datetime

def main():
    def thr(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=thr, args=(loop, ), daemon=True)
    t.start()

    print("Hello", datetime.now())
    t1 = asyncio.run_coroutine_threadsafe(f1(1.0), loop).result()
    t2 = asyncio.run_coroutine_threadsafe(f1(2.0), loop).result()
    print(t1, t2)
 

if __name__ == "__main__":
    main()

>>> Hello 2021-10-26 20:37:00.454577
>>> Hello 1.0 2021-10-26 20:37:01.464127
>>> Hello 2.0 2021-10-26 20:37:03.468691
>>> 1.0 2.0