0

I'm studying the process of distributing artificial intelligence modules through fastapi.

I'm going to take a load test

I created an api that answers questions through fastapi using a pre-learned model.

In this case, it is not a problem for one user to use it, but when multiple users use it at the same time, the response may be too slow.

So when multiple users enter a question, is there any way to copy the model and put it in at once?


class sentencebert_ai():
    def __init__(self) -> None:
        super().__init__()

 def ask_query(self,query, topN):
        startt = time.time()

        ask_result = []
        score = []
        result_value = []  
        embedder = torch.load(model_path)
        corpus_embeddings = embedder.encode(corpus, convert_to_tensor=True)
        query_embedding = embedder.encode(query, convert_to_tensor=True)
        cos_scores = util.pytorch_cos_sim(query_embedding, corpus_embeddings)[0] #torch.Size([121])121개의 말뭉치에 대한 코사인 유사도 값이다.
        cos_scores = cos_scores.cpu()

        top_results = np.argpartition(-cos_scores, range(topN))[0:topN]

        for idx in top_results[0:topN]:        
            ask_result.append(corpusid[idx].item())
            #.item()으로 접근하는 이유는 tensor(5)에서 해당 숫자에 접근하기 위한 방식이다.
            score.append(round(cos_scores[idx].item(),3))

        #서버에 json array 형태로 내보내기 위한 작업
        for i,e in zip(ask_result,score):
            result_value.append({"pred_id":i,"pred_weight":e})
        endd = time.time()
        print('시간체크',endd-startt)
        return result_value
        # return ','.join(str(e) for e in ask_result),','.join(str(e) for e in score)



class Item_inference(BaseModel):
    text : str
    topN : Optional[int] = 1

@app.post("/retrieval", tags=["knowledge recommendation"])
async def Knowledge_recommendation(item: Item_inference):
  
    # db.append(item.dict())
    item.dict()
    results = _ai.ask_query(item.text, item.topN)

    return results


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", default='9003', type=int)
    # parser.add_argument("--mode", default='cpu', type=str, help='cpu for CPU mode, gpu for GPU mode')
    args = parser.parse_args()

    _ai = sentencebert_ai()
    uvicorn.run(app, host="0.0.0.0", port=args.port,workers=4)

corrected version

@app.post("/aaa") def your_endpoint(request: Request, item:Item_inference): start = time.time() model = request.app.state.model item.dict() #커널 실행시 필요 _ai = sentencebert_ai() results = _ai.ask_query(item.text, item.topN,model) end = time.time() print(end-start) return results ``` 
WONJUN
  • 13
  • 3
  • This question is not very clear, can you reformulate it and complete a bit the code ? – Ziur Olpa Mar 25 '22 at 07:24
  • If your recommendation engine takes a lot of time, there's not really much you can do magically to speed that up - limit the amount of work done that is not specific to each user (so that depends on how `ask_query` is implemented). Since this is probably CPU bound, you might want to instead start multiple instances (worker threads/processes) of your application when using gunicorn or similar, so that you can use more processor cores efficiently. – MatsLindh Mar 25 '22 at 07:39
  • i edit my code thank u – WONJUN Mar 25 '22 at 07:39
  • @MatsLindh Thank you. I didn't know Gunicorn, but I'll give it a try! – WONJUN Mar 25 '22 at 07:45
  • @MatsLindh OP is already using `uvicorn`, no need to use `gunicorn`. – AKX Mar 25 '22 at 07:46
  • Ah, I missed that. My bad. uvicorn should support the same through `workers`. – MatsLindh Mar 25 '22 at 08:25
  • @MatsLindh Thanks to you, I learned about the workers parameter and I can use it. – WONJUN Mar 28 '22 at 02:15

1 Answers1

1

Firstly, you should not load your model every time a request arrives, but rahter have it loaded once at startup (you could use the startup event for this) and store it on the app instance, which you can later retrieve, as described here and here. For instance:

@app.on_event("startup")
async def startup_event():
    app.state.model = torch.load(model_path)

from fastapi import Request

@app.post("/")
def your_endpoint(request: Request):
        model = request.app.state.model
        # then pass it to your ask_query function

Secondly, if you do not have to await for coroutines in your route, then you should rather define your route with def instead of async def. In this way, FastAPI will process the requests concurrently (each will run in a separate thread), whereas async def routes run on the main thread, i.e., the server processes the requests sequentially (as long as there is no await call to I/O-bound operations inside such routes). Please have a look at the answers here and here, as well as all the references included in them, to understand the concept of async/await, and the difference between using def and async def.

Chris
  • 4,940
  • 2
  • 7
  • 28
  • 1
    Thank you for the best answer that suits my situation. – WONJUN Mar 25 '22 at 08:46
  • Thank you Chris. Like the advice you gave, As a result of saving the model in the app and loading it, The average response time was reduced to 0.72 -> 0.5. And since I don't need to use concurrence, I decided to use def instead of async def. Could you please check if the method is correct? – WONJUN Mar 28 '22 at 02:19
  • I was able to save time by instantiating. In the performance load test, I didn't see any reduction in time by using def, so I was wondering if I understood something wrong. – WONJUN Mar 28 '22 at 03:23