处理速度快 api 多用户
Process fast api multi-user
正在研究通过fastapi分发人工智能模块的过程。
我要进行负载测试
我创建了一个api,它使用预先学习的模型通过快速api回答问题。
这种情况下,一个用户使用没有问题,但是当多个用户同时使用时,响应可能会太慢。
那么当多个用户输入一个问题时,有没有什么办法可以把模型复制过来一下子放进去?
class sentencebert_ai():
def __init__(self) -> None:
super().__init__()
def ask_query(self,query, topN):
startt = time.time()
ask_result = []
score = []
result_value = []
embedder = torch.load(model_path)
corpus_embeddings = embedder.encode(corpus, convert_to_tensor=True)
query_embedding = embedder.encode(query, convert_to_tensor=True)
cos_scores = util.pytorch_cos_sim(query_embedding, corpus_embeddings)[0] #torch.Size([121])121개의 말뭉치에 대한 코사인 유사도 값이다.
cos_scores = cos_scores.cpu()
top_results = np.argpartition(-cos_scores, range(topN))[0:topN]
for idx in top_results[0:topN]:
ask_result.append(corpusid[idx].item())
#.item()으로 접근하는 이유는 tensor(5)에서 해당 숫자에 접근하기 위한 방식이다.
score.append(round(cos_scores[idx].item(),3))
#서버에 json array 형태로 내보내기 위한 작업
for i,e in zip(ask_result,score):
result_value.append({"pred_id":i,"pred_weight":e})
endd = time.time()
print('시간체크',endd-startt)
return result_value
# return ','.join(str(e) for e in ask_result),','.join(str(e) for e in score)
class Item_inference(BaseModel):
text : str
topN : Optional[int] = 1
@app.post("/retrieval", tags=["knowledge recommendation"])
async def Knowledge_recommendation(item: Item_inference):
# db.append(item.dict())
item.dict()
results = _ai.ask_query(item.text, item.topN)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--port", default='9003', type=int)
# parser.add_argument("--mode", default='cpu', type=str, help='cpu for CPU mode, gpu for GPU mode')
args = parser.parse_args()
_ai = sentencebert_ai()
uvicorn.run(app, host="0.0.0.0", port=args.port,workers=4)
更正版本
@app.post("/aaa") def your_endpoint(request: Request, item:Item_inference): start = time.time() model = request.app.state.model item.dict() #커널 실행시 필요 _ai = sentencebert_ai() results = _ai.ask_query(item.text, item.topN,model) end = time.time() print(end-start) return results ```
首先,你不应该在每次请求到达时加载你的模型,而是在启动时加载一次(你可以使用 startup event for this) and store it on the app instance, which you can later retrieve, as described here and 。例如:
@app.on_event("startup")
async def startup_event():
app.state.model = torch.load(model_path)
from fastapi import Request
@app.post("/")
def your_endpoint(request: Request):
model = request.app.state.model
# then pass it to your ask_query function
其次,如果您不必 await
用于路由中的协程,那么您应该使用 def
而不是 async def
来定义路由。这样,FastAPI 将并发处理请求(每个请求将 运行 在单独的线程中),而 async def
在主线程上路由 运行,即服务器顺序处理请求(只要在此类路由中没有 await
调用 I/O-bound
操作)。请查看答案 and ,以及其中包含的所有参考资料,以了解 async/await
的概念,以及使用 def
和 async def
的区别.
正在研究通过fastapi分发人工智能模块的过程。
我要进行负载测试
我创建了一个api,它使用预先学习的模型通过快速api回答问题。
这种情况下,一个用户使用没有问题,但是当多个用户同时使用时,响应可能会太慢。
那么当多个用户输入一个问题时,有没有什么办法可以把模型复制过来一下子放进去?
class sentencebert_ai():
def __init__(self) -> None:
super().__init__()
def ask_query(self,query, topN):
startt = time.time()
ask_result = []
score = []
result_value = []
embedder = torch.load(model_path)
corpus_embeddings = embedder.encode(corpus, convert_to_tensor=True)
query_embedding = embedder.encode(query, convert_to_tensor=True)
cos_scores = util.pytorch_cos_sim(query_embedding, corpus_embeddings)[0] #torch.Size([121])121개의 말뭉치에 대한 코사인 유사도 값이다.
cos_scores = cos_scores.cpu()
top_results = np.argpartition(-cos_scores, range(topN))[0:topN]
for idx in top_results[0:topN]:
ask_result.append(corpusid[idx].item())
#.item()으로 접근하는 이유는 tensor(5)에서 해당 숫자에 접근하기 위한 방식이다.
score.append(round(cos_scores[idx].item(),3))
#서버에 json array 형태로 내보내기 위한 작업
for i,e in zip(ask_result,score):
result_value.append({"pred_id":i,"pred_weight":e})
endd = time.time()
print('시간체크',endd-startt)
return result_value
# return ','.join(str(e) for e in ask_result),','.join(str(e) for e in score)
class Item_inference(BaseModel):
text : str
topN : Optional[int] = 1
@app.post("/retrieval", tags=["knowledge recommendation"])
async def Knowledge_recommendation(item: Item_inference):
# db.append(item.dict())
item.dict()
results = _ai.ask_query(item.text, item.topN)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--port", default='9003', type=int)
# parser.add_argument("--mode", default='cpu', type=str, help='cpu for CPU mode, gpu for GPU mode')
args = parser.parse_args()
_ai = sentencebert_ai()
uvicorn.run(app, host="0.0.0.0", port=args.port,workers=4)
更正版本
@app.post("/aaa") def your_endpoint(request: Request, item:Item_inference): start = time.time() model = request.app.state.model item.dict() #커널 실행시 필요 _ai = sentencebert_ai() results = _ai.ask_query(item.text, item.topN,model) end = time.time() print(end-start) return results ```
首先,你不应该在每次请求到达时加载你的模型,而是在启动时加载一次(你可以使用 startup event for this) and store it on the app instance, which you can later retrieve, as described here and
@app.on_event("startup")
async def startup_event():
app.state.model = torch.load(model_path)
from fastapi import Request
@app.post("/")
def your_endpoint(request: Request):
model = request.app.state.model
# then pass it to your ask_query function
其次,如果您不必 await
用于路由中的协程,那么您应该使用 def
而不是 async def
来定义路由。这样,FastAPI 将并发处理请求(每个请求将 运行 在单独的线程中),而 async def
在主线程上路由 运行,即服务器顺序处理请求(只要在此类路由中没有 await
调用 I/O-bound
操作)。请查看答案 async/await
的概念,以及使用 def
和 async def
的区别.