为同步执行包装 python async

Wrapping python async for synchronous execution

我正在尝试尽快从本地 Postgres 数据库加载数据,看来性能最高的 python 包是 asyncpg。我的代码是同步的,我反复需要加载数据块。我对让 async 关键字传播到我编写的每个函数不感兴趣,所以我试图将异步代码包装在同步函数中。

下面的代码有效,但非常丑陋:

def connect_to_postgres(user, password, database, host):
    async def wrapped():
        return await asyncpg.connect(user=keys['user'], password=keys['password'],
                                    database='markets', host='127.0.0.1')
    loop = asyncio.get_event_loop()    
    db_connection = loop.run_until_complete(wrapped())
    return db_connection
    
db_connection = connect_to_postgres(keys['user'], keys['password'],
                                    'db', '127.0.0.1')

def fetch_from_postgres(query, db_connection):
    async def wrapped():
        return await db_connection.fetch(query)
    loop = asyncio.get_event_loop()    
    values = loop.run_until_complete(wrapped())
    return values

fetch_from_postgres("SELECT * from db LIMIT 5", db_connection)

在 Julia 我会做类似的事情

f() = @async 5
g() = fetch(f())
g()

但在 Python 中,我似乎不得不做一些笨拙的事情,

async def f():
  return 5
def g():
  loop = asyncio.get_event_loop()    
  return loop.run_until_complete(f())

只是想知道是否有更好的方法?

编辑:后一个python例子当然可以用

来写
def fetch(x):
    loop = asyncio.get_event_loop()    
    return loop.run_until_complete(x)

尽管如此,除非我遗漏了什么,否则仍然需要创建一个异步包装函数。

编辑 2:我确实关心性能,但希望使用同步编程方法。 asyncpg 比 psycopg2 快 3 倍,因为它的核心实现是在 Cython 而不是 Python 中,这在 https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/ 中有更详细的解释。因此我希望包装这个异步代码。

编辑 3:提出这个问题的另一种方式是在 python 中避免 "what color is your function" 的最佳方法是什么?

如果一开始就设置好程序结构,这并不难。您创建第二个线程,您的异步代码将在其中 运行,并启动其事件循环。当保持完全同步的主线程需要异步调用(协程)的结果时,可以使用方法 asyncio.run_coroutine_threadsafe。该方法 returns 一个 concurrent.futures.Future 对象。您可以通过调用其方法 result() 来获取返回值,该方法会阻塞直到结果可用。

这几乎就像您像调用子例程一样调用异步方法。开销很小,因为您只创建了一个辅助线程。这是一个简单的例子:

import asyncio
import threading
from datetime import datetime

async def demo(t):
    await asyncio.sleep(t)
    print(f"Demo function {t} {datetime.now()}")
    return t

def main():
    def thr(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
        
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=thr, args=(loop, ), daemon=True)
    t.start()

    print("Main", datetime.now())
    t1 = asyncio.run_coroutine_threadsafe(demo(1.0), loop).result()
    t2 = asyncio.run_coroutine_threadsafe(demo(2.0), loop).result()
    print(t1, t2)

if __name__ == "__main__":
    main()

# >>> Main 2021-12-06 19:14:14.135206
# >>> Demo function 1.0 2021-12-06 19:14:15.146803
# >>> Demo function 2.0 2021-12-06 19:14:17.155898
# >>> 1.0 2.0

您的主程序在第一次调用 demo() 时延迟 1 秒,在第二次调用时延迟 2 秒。那是因为你的主线程没有事件循环,因此不能并行执行两个延迟。但这正是你所暗示的,当你说你想要一个使用第三方异步包的同步程序时。

这是一个类似的答案,但问题略有不同: