Python,异步,单例事件循环
Python, Async, singleton event loop
我有很多异步代码,我有问题。
我可以在整个项目中有一个单例事件循环,还是我应该在每个函数、方法、class 中使用 get_event_loop()
?声明一次并在项目中的任何地方使用它是否存在一些问题?
比如我有3个文件app.py、views.py、internal.py
app.py
app = FastAPI()
loop = get_event_loop()
views.py
from app import app, loop
@app.get('/')
async def main(request):
loop.create_task(<any coroutine>)
return {'status': 'ok'}
internal.py
from app import loop
async def any_buisiness_logic():
loop.create_task(<any coroutine>)
return "task created"
或者我应该在每个文件中 get_event_loop()
?
您可以改用 asyncio.create_task
。在 Python.
的较新版本中不需要传递循环
The task is executed in the loop returned by get_running_loop(),
RuntimeError is raised if there is no running loop in current thread.
https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
我有很多异步代码,我有问题。
我可以在整个项目中有一个单例事件循环,还是我应该在每个函数、方法、class 中使用 get_event_loop()
?声明一次并在项目中的任何地方使用它是否存在一些问题?
比如我有3个文件app.py、views.py、internal.py
app.py
app = FastAPI()
loop = get_event_loop()
views.py
from app import app, loop
@app.get('/')
async def main(request):
loop.create_task(<any coroutine>)
return {'status': 'ok'}
internal.py
from app import loop
async def any_buisiness_logic():
loop.create_task(<any coroutine>)
return "task created"
或者我应该在每个文件中 get_event_loop()
?
您可以改用 asyncio.create_task
。在 Python.
The task is executed in the loop returned by get_running_loop(), RuntimeError is raised if there is no running loop in current thread.
https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task