快速Api 与他人交流Api
FastApi communication with other Api
我最近在使用 fastapi 作为练习,我想将我的 fastapi api 与其他服务器上的验证服务连接...但我没有知道怎么做,我没有在官方文档中找到对我有帮助的东西。我必须用 python 代码来做吗?或者有什么办法吗?
谢谢你的帮助,请原谅我的英语。
您需要使用 Python 对其进行编码。
如果您使用异步,您应该使用也是异步的 HTTP 客户端,例如 aiohttp。
import aiohttp
@app.get("/")
async def slow_route():
async with aiohttp.ClientSession() as session:
async with session.get("http://validation_service.com") as resp:
data = await resp.text()
# do something with data
接受的答案当然有效,但不是有效的解决方案。对于每个请求,ClientSession
关闭,因此我们失去了 ClientSession
的优势 [0]:连接池、keepalive 等
我们可以使用FastAPI中的startup
和shutdown
事件[1],分别在服务器启动和关闭时触发。在这些事件中,可以创建一个 ClientSession
实例并在整个应用程序运行时使用它(因此充分发挥其潜力)。
ClientSession
实例存储在应用程序状态中。 [2]
这里我在aiohttp服务器的上下文中回答了一个非常相似的问题:
from __future__ import annotations
import asyncio
from typing import Final
from aiohttp import ClientSession
from fastapi import Depends, FastAPI
from starlette.requests import Request
app: Final = FastAPI()
@app.on_event("startup")
async def startup_event():
setattr(app.state, "client_session", ClientSession(raise_for_status=True))
@app.on_event("shutdown")
async def shutdown_event():
await asyncio.wait((app.state.client_session.close()), timeout=5.0)
def client_session_dep(request: Request) -> ClientSession:
return request.app.state.client_session
@app.get("/")
async def root(
client_session: ClientSession = Depends(client_session_dep),
) -> str:
async with client_session.get(
"https://example.com/", raise_for_status=True
) as the_response:
return await the_response.text()
我最近在使用 fastapi 作为练习,我想将我的 fastapi api 与其他服务器上的验证服务连接...但我没有知道怎么做,我没有在官方文档中找到对我有帮助的东西。我必须用 python 代码来做吗?或者有什么办法吗?
谢谢你的帮助,请原谅我的英语。
您需要使用 Python 对其进行编码。
如果您使用异步,您应该使用也是异步的 HTTP 客户端,例如 aiohttp。
import aiohttp
@app.get("/")
async def slow_route():
async with aiohttp.ClientSession() as session:
async with session.get("http://validation_service.com") as resp:
data = await resp.text()
# do something with data
接受的答案当然有效,但不是有效的解决方案。对于每个请求,ClientSession
关闭,因此我们失去了 ClientSession
的优势 [0]:连接池、keepalive 等
我们可以使用FastAPI中的startup
和shutdown
事件[1],分别在服务器启动和关闭时触发。在这些事件中,可以创建一个 ClientSession
实例并在整个应用程序运行时使用它(因此充分发挥其潜力)。
ClientSession
实例存储在应用程序状态中。 [2]
这里我在aiohttp服务器的上下文中回答了一个非常相似的问题:
from __future__ import annotations
import asyncio
from typing import Final
from aiohttp import ClientSession
from fastapi import Depends, FastAPI
from starlette.requests import Request
app: Final = FastAPI()
@app.on_event("startup")
async def startup_event():
setattr(app.state, "client_session", ClientSession(raise_for_status=True))
@app.on_event("shutdown")
async def shutdown_event():
await asyncio.wait((app.state.client_session.close()), timeout=5.0)
def client_session_dep(request: Request) -> ClientSession:
return request.app.state.client_session
@app.get("/")
async def root(
client_session: ClientSession = Depends(client_session_dep),
) -> str:
async with client_session.get(
"https://example.com/", raise_for_status=True
) as the_response:
return await the_response.text()