为同步执行包装 python async
Wrapping python async for synchronous execution
我正在尝试尽快从本地 Postgres 数据库加载数据,看来性能最高的 python 包是 asyncpg。我的代码是同步的,我反复需要加载数据块。我对让 async
关键字传播到我编写的每个函数不感兴趣,所以我试图将异步代码包装在同步函数中。
下面的代码有效,但非常丑陋:
def connect_to_postgres(user, password, database, host):
async def wrapped():
return await asyncpg.connect(user=keys['user'], password=keys['password'],
database='markets', host='127.0.0.1')
loop = asyncio.get_event_loop()
db_connection = loop.run_until_complete(wrapped())
return db_connection
db_connection = connect_to_postgres(keys['user'], keys['password'],
'db', '127.0.0.1')
def fetch_from_postgres(query, db_connection):
async def wrapped():
return await db_connection.fetch(query)
loop = asyncio.get_event_loop()
values = loop.run_until_complete(wrapped())
return values
fetch_from_postgres("SELECT * from db LIMIT 5", db_connection)
在 Julia 我会做类似的事情
f() = @async 5
g() = fetch(f())
g()
但在 Python 中,我似乎不得不做一些笨拙的事情,
async def f():
return 5
def g():
loop = asyncio.get_event_loop()
return loop.run_until_complete(f())
只是想知道是否有更好的方法?
编辑:后一个python例子当然可以用
来写
def fetch(x):
loop = asyncio.get_event_loop()
return loop.run_until_complete(x)
尽管如此,除非我遗漏了什么,否则仍然需要创建一个异步包装函数。
编辑 2:我确实关心性能,但希望使用同步编程方法。 asyncpg 比 psycopg2 快 3 倍,因为它的核心实现是在 Cython 而不是 Python 中,这在 https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/ 中有更详细的解释。因此我希望包装这个异步代码。
编辑 3:提出这个问题的另一种方式是在 python 中避免 "what color is your function" 的最佳方法是什么?
如果一开始就设置好程序结构,这并不难。您创建第二个线程,您的异步代码将在其中 运行,并启动其事件循环。当保持完全同步的主线程需要异步调用(协程)的结果时,可以使用方法 asyncio.run_coroutine_threadsafe
。该方法 returns 一个 concurrent.futures.Future 对象。您可以通过调用其方法 result() 来获取返回值,该方法会阻塞直到结果可用。
这几乎就像您像调用子例程一样调用异步方法。开销很小,因为您只创建了一个辅助线程。这是一个简单的例子:
import asyncio
import threading
from datetime import datetime
async def demo(t):
await asyncio.sleep(t)
print(f"Demo function {t} {datetime.now()}")
return t
def main():
def thr(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
loop = asyncio.new_event_loop()
t = threading.Thread(target=thr, args=(loop, ), daemon=True)
t.start()
print("Main", datetime.now())
t1 = asyncio.run_coroutine_threadsafe(demo(1.0), loop).result()
t2 = asyncio.run_coroutine_threadsafe(demo(2.0), loop).result()
print(t1, t2)
if __name__ == "__main__":
main()
# >>> Main 2021-12-06 19:14:14.135206
# >>> Demo function 1.0 2021-12-06 19:14:15.146803
# >>> Demo function 2.0 2021-12-06 19:14:17.155898
# >>> 1.0 2.0
您的主程序在第一次调用 demo() 时延迟 1 秒,在第二次调用时延迟 2 秒。那是因为你的主线程没有事件循环,因此不能并行执行两个延迟。但这正是你所暗示的,当你说你想要一个使用第三方异步包的同步程序时。
这是一个类似的答案,但问题略有不同:
我正在尝试尽快从本地 Postgres 数据库加载数据,看来性能最高的 python 包是 asyncpg。我的代码是同步的,我反复需要加载数据块。我对让 async
关键字传播到我编写的每个函数不感兴趣,所以我试图将异步代码包装在同步函数中。
下面的代码有效,但非常丑陋:
def connect_to_postgres(user, password, database, host):
async def wrapped():
return await asyncpg.connect(user=keys['user'], password=keys['password'],
database='markets', host='127.0.0.1')
loop = asyncio.get_event_loop()
db_connection = loop.run_until_complete(wrapped())
return db_connection
db_connection = connect_to_postgres(keys['user'], keys['password'],
'db', '127.0.0.1')
def fetch_from_postgres(query, db_connection):
async def wrapped():
return await db_connection.fetch(query)
loop = asyncio.get_event_loop()
values = loop.run_until_complete(wrapped())
return values
fetch_from_postgres("SELECT * from db LIMIT 5", db_connection)
在 Julia 我会做类似的事情
f() = @async 5
g() = fetch(f())
g()
但在 Python 中,我似乎不得不做一些笨拙的事情,
async def f():
return 5
def g():
loop = asyncio.get_event_loop()
return loop.run_until_complete(f())
只是想知道是否有更好的方法?
编辑:后一个python例子当然可以用
来写def fetch(x):
loop = asyncio.get_event_loop()
return loop.run_until_complete(x)
尽管如此,除非我遗漏了什么,否则仍然需要创建一个异步包装函数。
编辑 2:我确实关心性能,但希望使用同步编程方法。 asyncpg 比 psycopg2 快 3 倍,因为它的核心实现是在 Cython 而不是 Python 中,这在 https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/ 中有更详细的解释。因此我希望包装这个异步代码。
编辑 3:提出这个问题的另一种方式是在 python 中避免 "what color is your function" 的最佳方法是什么?
如果一开始就设置好程序结构,这并不难。您创建第二个线程,您的异步代码将在其中 运行,并启动其事件循环。当保持完全同步的主线程需要异步调用(协程)的结果时,可以使用方法 asyncio.run_coroutine_threadsafe
。该方法 returns 一个 concurrent.futures.Future 对象。您可以通过调用其方法 result() 来获取返回值,该方法会阻塞直到结果可用。
这几乎就像您像调用子例程一样调用异步方法。开销很小,因为您只创建了一个辅助线程。这是一个简单的例子:
import asyncio
import threading
from datetime import datetime
async def demo(t):
await asyncio.sleep(t)
print(f"Demo function {t} {datetime.now()}")
return t
def main():
def thr(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
loop = asyncio.new_event_loop()
t = threading.Thread(target=thr, args=(loop, ), daemon=True)
t.start()
print("Main", datetime.now())
t1 = asyncio.run_coroutine_threadsafe(demo(1.0), loop).result()
t2 = asyncio.run_coroutine_threadsafe(demo(2.0), loop).result()
print(t1, t2)
if __name__ == "__main__":
main()
# >>> Main 2021-12-06 19:14:14.135206
# >>> Demo function 1.0 2021-12-06 19:14:15.146803
# >>> Demo function 2.0 2021-12-06 19:14:17.155898
# >>> 1.0 2.0
您的主程序在第一次调用 demo() 时延迟 1 秒,在第二次调用时延迟 2 秒。那是因为你的主线程没有事件循环,因此不能并行执行两个延迟。但这正是你所暗示的,当你说你想要一个使用第三方异步包的同步程序时。
这是一个类似的答案,但问题略有不同: