处理速度快 api 多用户

Process fast api multi-user

正在研究通过fastapi分发人工智能模块的过程。

我要进行负载测试

我创建了一个api,它使用预先学习的模型通过快速api回答问题。

这种情况下,一个用户使用没有问题,但是当多个用户同时使用时,响应可能会太慢。

那么当多个用户输入一个问题时,有没有什么办法可以把模型复制过来一下子放进去?


class sentencebert_ai():
    def __init__(self) -> None:
        super().__init__()

 def ask_query(self,query, topN):
        startt = time.time()

        ask_result = []
        score = []
        result_value = []  
        embedder = torch.load(model_path)
        corpus_embeddings = embedder.encode(corpus, convert_to_tensor=True)
        query_embedding = embedder.encode(query, convert_to_tensor=True)
        cos_scores = util.pytorch_cos_sim(query_embedding, corpus_embeddings)[0] #torch.Size([121])121개의 말뭉치에 대한 코사인 유사도 값이다.
        cos_scores = cos_scores.cpu()

        top_results = np.argpartition(-cos_scores, range(topN))[0:topN]

        for idx in top_results[0:topN]:        
            ask_result.append(corpusid[idx].item())
            #.item()으로 접근하는 이유는 tensor(5)에서 해당 숫자에 접근하기 위한 방식이다.
            score.append(round(cos_scores[idx].item(),3))

        #서버에 json array 형태로 내보내기 위한 작업
        for i,e in zip(ask_result,score):
            result_value.append({"pred_id":i,"pred_weight":e})
        endd = time.time()
        print('시간체크',endd-startt)
        return result_value
        # return ','.join(str(e) for e in ask_result),','.join(str(e) for e in score)



class Item_inference(BaseModel):
    text : str
    topN : Optional[int] = 1

@app.post("/retrieval", tags=["knowledge recommendation"])
async def Knowledge_recommendation(item: Item_inference):
  
    # db.append(item.dict())
    item.dict()
    results = _ai.ask_query(item.text, item.topN)

    return results


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", default='9003', type=int)
    # parser.add_argument("--mode", default='cpu', type=str, help='cpu for CPU mode, gpu for GPU mode')
    args = parser.parse_args()

    _ai = sentencebert_ai()
    uvicorn.run(app, host="0.0.0.0", port=args.port,workers=4)

更正版本

@app.post("/aaa") def your_endpoint(request: Request, item:Item_inference): start = time.time() model = request.app.state.model item.dict() #커널 실행시 필요 _ai = sentencebert_ai() results = _ai.ask_query(item.text, item.topN,model) end = time.time() print(end-start) return results ``` 

首先,你不应该在每次请求到达时加载你的模型,而是在启动时加载一次(你可以使用 startup event for this) and store it on the app instance, which you can later retrieve, as described here and 。例如:

@app.on_event("startup")
async def startup_event():
    app.state.model = torch.load(model_path)

from fastapi import Request

@app.post("/")
def your_endpoint(request: Request):
        model = request.app.state.model
        # then pass it to your ask_query function

其次,如果您不必 await 用于路由中的协程,那么您应该使用 def 而不是 async def 来定义路由。这样,FastAPI 将并发处理请求(每个请求将 运行 在单独的线程中),而 async def 在主线程上路由 运行,即服务器顺序处理请求(只要在此类路由中没有 await 调用 I/O-bound 操作)。请查看答案 and ,以及其中包含的所有参考资料,以了解 async/await 的概念,以及使用 defasync def 的区别.