正确使用 use/constraints,nest_asyncio?
Correct use/constraints of use, of nest_asyncio?
我正在努力使以前同步的网络服务器成为同步服务器。
我的大部分功能都是同步的,我想简单地从现有代码进行异步调用以避免异步蔓延。 nest_asyncio 似乎允许 run_until_complete 重入。
然而,虽然这适用于 1 个可重入调用,但我遇到了两个死锁:
import asyncio
import functools
import time
from random import random
import nest_asyncio
nest_asyncio.apply()
def sync(corot, loop=None):
"""
Make a synchronous function from an asynchronous one.
:param corot:
:return:
"""
if loop is None:
loop = asyncio.get_event_loop()
result, = loop.run_until_complete(asyncio.gather(corot))
return result
async def sync_to_corountine(func, *args, **kw):
"""
Make a coroutine from a synchronous function.
"""
try:
return func(*args, *kw)
finally:
# every async needs an await.
await asyncio.sleep(0)
def main():
async def background(timeout):
await asyncio.sleep(timeout)
print(f"Background: {timeout}")
loop = asyncio.get_event_loop()
# Run some background work to check we are never blocked
bg_tasks = [
loop.create_task(background(i))
for i in range(10)
]
async def long_running_async_task(result):
# Simulate slow IO
print(f"...START long_running_async_task [{result}]")
await asyncio.sleep(1)
print(f"...END long_running_async_task [{result}]")
return result
def sync_function_with_async_dependency(result):
print(f"...START sync_function_with_async_dependency [{result}]")
result = sync(long_running_async_task(result), loop=loop)
print(f"...END sync_function_with_async_dependency [{result}]")
return result
# Call sync_function_with_async_dependency
# One reentrant task is OK
# Multiple reentrant tasks=>fails to exit
n = 2
for i in range(n):
bg_tasks.append(sync_to_corountine(sync_function_with_async_dependency, i))
# for i in range(n):
# bg_tasks.append(long_running_async_task(i))
# OK
# bg_tasks.append(long_running_async_task(123))
# bg_tasks.append(long_running_async_task(456))
task = asyncio.gather(*bg_tasks) # , loop=loop)
loop.run_until_complete(task)
if __name__ == '__main__':
main()
...START sync_function_with_async_dependency [0]
...START sync_function_with_async_dependency [1]
Background: 0
...START long_running_async_task [0]
...START long_running_async_task [1]
Background: 1
...END long_running_async_task [0]
...END long_running_async_task [1]
...END sync_function_with_async_dependency [1]
Background: 2
Background: 3
Background: 4
...我们不见了
...END sync_function_with_async_dependency [0]
我使用 nest_asyncio 正确吗?
如果要在同步调用中嵌套异步调用,则需要使用 gevent,否则会导致饥饿问题。
asyncio 的整个设计是异步例程调用异步例程,但同步例程不能(通过 运行 除外)。他们这样做是因为他们觉得其他任何事情都会导致代码过于复杂而难以理解。可能 asyncio 中的错误也更难修复。
猴子补丁 nest_async 试图打破这个设计原则,但它做得不是很好,因为嵌套的 运行s 不会给嵌套之外的任务安排时间运行。如果您在嵌套 运行.
内的代码上花费太多时间,这可能会导致饥饿
我试图向作者解释这一点,但他仍在挣扎:
我正在努力使以前同步的网络服务器成为同步服务器。 我的大部分功能都是同步的,我想简单地从现有代码进行异步调用以避免异步蔓延。 nest_asyncio 似乎允许 run_until_complete 重入。
然而,虽然这适用于 1 个可重入调用,但我遇到了两个死锁:
import asyncio
import functools
import time
from random import random
import nest_asyncio
nest_asyncio.apply()
def sync(corot, loop=None):
"""
Make a synchronous function from an asynchronous one.
:param corot:
:return:
"""
if loop is None:
loop = asyncio.get_event_loop()
result, = loop.run_until_complete(asyncio.gather(corot))
return result
async def sync_to_corountine(func, *args, **kw):
"""
Make a coroutine from a synchronous function.
"""
try:
return func(*args, *kw)
finally:
# every async needs an await.
await asyncio.sleep(0)
def main():
async def background(timeout):
await asyncio.sleep(timeout)
print(f"Background: {timeout}")
loop = asyncio.get_event_loop()
# Run some background work to check we are never blocked
bg_tasks = [
loop.create_task(background(i))
for i in range(10)
]
async def long_running_async_task(result):
# Simulate slow IO
print(f"...START long_running_async_task [{result}]")
await asyncio.sleep(1)
print(f"...END long_running_async_task [{result}]")
return result
def sync_function_with_async_dependency(result):
print(f"...START sync_function_with_async_dependency [{result}]")
result = sync(long_running_async_task(result), loop=loop)
print(f"...END sync_function_with_async_dependency [{result}]")
return result
# Call sync_function_with_async_dependency
# One reentrant task is OK
# Multiple reentrant tasks=>fails to exit
n = 2
for i in range(n):
bg_tasks.append(sync_to_corountine(sync_function_with_async_dependency, i))
# for i in range(n):
# bg_tasks.append(long_running_async_task(i))
# OK
# bg_tasks.append(long_running_async_task(123))
# bg_tasks.append(long_running_async_task(456))
task = asyncio.gather(*bg_tasks) # , loop=loop)
loop.run_until_complete(task)
if __name__ == '__main__':
main()
...START sync_function_with_async_dependency [0]
...START sync_function_with_async_dependency [1]
Background: 0
...START long_running_async_task [0]
...START long_running_async_task [1]
Background: 1
...END long_running_async_task [0]
...END long_running_async_task [1]
...END sync_function_with_async_dependency [1]
Background: 2
Background: 3
Background: 4
...我们不见了
...END sync_function_with_async_dependency [0]
我使用 nest_asyncio 正确吗?
如果要在同步调用中嵌套异步调用,则需要使用 gevent,否则会导致饥饿问题。
asyncio 的整个设计是异步例程调用异步例程,但同步例程不能(通过 运行 除外)。他们这样做是因为他们觉得其他任何事情都会导致代码过于复杂而难以理解。可能 asyncio 中的错误也更难修复。
猴子补丁 nest_async 试图打破这个设计原则,但它做得不是很好,因为嵌套的 运行s 不会给嵌套之外的任务安排时间运行。如果您在嵌套 运行.
内的代码上花费太多时间,这可能会导致饥饿我试图向作者解释这一点,但他仍在挣扎: