将几个 post 请求合并为一个,转换这批 return 对这些 post 请求的回答

Combine several post request into one, transform this batch and return answers for these post requests

我不擅长处理请求,但我当前的项目需要这个。现在我的服务器是这样工作的:

from aiohttp import web

@routes.post('/')
async def my_func(request):
    post = await request.json()
    answer = '... do something on GPU ...'
    return web.json_response(answer)`

但我想将多个请求合并为一个,并且只在 GPU 上执行一次我的功能。然后 return 对所有请求的响应(可能在循环中)。如果需要解决,我可以将 aoihttp 更改为不同的包。

例如,post 请求包含字段:{'id':1, 'data':'some data 1'}.

(1) 我要等待 5 个请求,合并数据以列出 ['some data 1', ..,'some data 5']

(2) 然后将我的函数应用于此列表(它 return 是我的答案列表 ['answer 1', ..,'answer 5']

(3) 然后我想像这样对每个请求做出响应 {'id':1, 'answers':'answer_1'}

我现在不知道如何实现步骤 (1) 和 (3)。

您可以保留 缓存 requests(问题)和 responses(答案)以及检查所述缓存的 background task;当问题缓存长度达到 5 时,您 运行 GPU 函数 并填充答案缓存。
每个请求都会等到答案缓存中有它需要的数据。

server.py

import asyncio
from aiohttp import web


def gpu_func(items):
    """Send a batch of 5 questions to GPU and return the answers"""
    answers = {}
    for item in items:
        answers[item["id"]] = "answer for data: " + item["data"]
    return answers


async def gpu_loop(app):
    """Check questions cache continuously and when we have 5 questions process them and populate answers cache"""
    while True:
        if len(app.cache["questions"]) >= 5:
            print("running GPU function")
            answers = gpu_func(app.cache["questions"][:5])
            print("got %d answers from GPU" % len(answers))
            app.cache["answers"].update(answers)
            app.cache["questions"] = app.cache["questions"][5:]
        await asyncio.sleep(0.05)  # sleep for 50ms


async def handle(request):
    """Main request handler: populate questions cache and wait for the answer to be available in the answers cache"""
    data = await request.post()
    print("got request with data ", data)
    request.app.cache["questions"].append(data)
    # can implement here a time limit using a counter (sleep_delay*counter = max time for request)
    while True:
        if data["id"] in request.app.cache["answers"]:
            break
        await asyncio.sleep(0.05)
    answer = request.app.cache["answers"].pop(data["id"], "unknown")
    return web.Response(text=answer)


# create background task (gpu_loop)
async def start_background_tasks(app):
    app.gpu_loop = asyncio.create_task(gpu_loop(app))


# stop background task on shutdown
async def cleanup_background_tasks(app):
    app.gpu_loop.cancel()
    await app.gpu_loop


def main():
    app = web.Application()
    app.cache = {"questions": [], "answers": {}}
    app.add_routes([web.post("/", handle)])
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    web.run_app(app)


if __name__ == "__main__":
    main()

client.py

import aiohttp
import asyncio


async def make_request(session, num):
    """Make a single request using the existing session object and custom number"""
    url = "http://127.0.01:8080"
    data = {"id": num, "data": "question %d" % num}
    response = await session.post(url, data=data)
    text = await response.text()
    return text


async def main():
    """Make 20 consecutive requests with a delay of 20 ms between them"""
    tasks = []
    session = aiohttp.ClientSession()
    for i in range(20):
        print("making request %d", i)
        task = asyncio.ensure_future(make_request(session, i))
        tasks.append(task)
        await asyncio.sleep(0.02)
    responses = await asyncio.gather(*tasks)
    for response in responses:
        print(response)
    await session.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

取决于并发性(从客户端发出请求的频率)和处理时间(处理请求需要多长时间)处理一组请求)你可能需要调整 timing(睡眠)和缓存 limit(缓存中保留多少请求)。

希望这能让你入门。