将几个 post 请求合并为一个,转换这批 return 对这些 post 请求的回答
Combine several post request into one, transform this batch and return answers for these post requests
我不擅长处理请求,但我当前的项目需要这个。现在我的服务器是这样工作的:
from aiohttp import web
@routes.post('/')
async def my_func(request):
post = await request.json()
answer = '... do something on GPU ...'
return web.json_response(answer)`
但我想将多个请求合并为一个,并且只在 GPU 上执行一次我的功能。然后 return 对所有请求的响应(可能在循环中)。如果需要解决,我可以将 aoihttp 更改为不同的包。
例如,post 请求包含字段:{'id':1, 'data':'some data 1'}.
(1) 我要等待 5 个请求,合并数据以列出 ['some data 1', ..,'some data 5']
(2) 然后将我的函数应用于此列表(它 return 是我的答案列表 ['answer 1', ..,'answer 5']
(3) 然后我想像这样对每个请求做出响应 {'id':1, 'answers':'answer_1'}
我现在不知道如何实现步骤 (1) 和 (3)。
您可以保留 缓存 requests
(问题)和 responses
(答案)以及检查所述缓存的 background task
;当问题缓存长度达到 5 时,您 运行 GPU 函数 并填充答案缓存。
每个请求都会等到答案缓存中有它需要的数据。
server.py
import asyncio
from aiohttp import web
def gpu_func(items):
"""Send a batch of 5 questions to GPU and return the answers"""
answers = {}
for item in items:
answers[item["id"]] = "answer for data: " + item["data"]
return answers
async def gpu_loop(app):
"""Check questions cache continuously and when we have 5 questions process them and populate answers cache"""
while True:
if len(app.cache["questions"]) >= 5:
print("running GPU function")
answers = gpu_func(app.cache["questions"][:5])
print("got %d answers from GPU" % len(answers))
app.cache["answers"].update(answers)
app.cache["questions"] = app.cache["questions"][5:]
await asyncio.sleep(0.05) # sleep for 50ms
async def handle(request):
"""Main request handler: populate questions cache and wait for the answer to be available in the answers cache"""
data = await request.post()
print("got request with data ", data)
request.app.cache["questions"].append(data)
# can implement here a time limit using a counter (sleep_delay*counter = max time for request)
while True:
if data["id"] in request.app.cache["answers"]:
break
await asyncio.sleep(0.05)
answer = request.app.cache["answers"].pop(data["id"], "unknown")
return web.Response(text=answer)
# create background task (gpu_loop)
async def start_background_tasks(app):
app.gpu_loop = asyncio.create_task(gpu_loop(app))
# stop background task on shutdown
async def cleanup_background_tasks(app):
app.gpu_loop.cancel()
await app.gpu_loop
def main():
app = web.Application()
app.cache = {"questions": [], "answers": {}}
app.add_routes([web.post("/", handle)])
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)
if __name__ == "__main__":
main()
client.py
import aiohttp
import asyncio
async def make_request(session, num):
"""Make a single request using the existing session object and custom number"""
url = "http://127.0.01:8080"
data = {"id": num, "data": "question %d" % num}
response = await session.post(url, data=data)
text = await response.text()
return text
async def main():
"""Make 20 consecutive requests with a delay of 20 ms between them"""
tasks = []
session = aiohttp.ClientSession()
for i in range(20):
print("making request %d", i)
task = asyncio.ensure_future(make_request(session, i))
tasks.append(task)
await asyncio.sleep(0.02)
responses = await asyncio.gather(*tasks)
for response in responses:
print(response)
await session.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
取决于并发性(从客户端发出请求的频率)和处理时间(处理请求需要多长时间)处理一组请求)你可能需要调整 timing(睡眠)和缓存 limit(缓存中保留多少请求)。
希望这能让你入门。
我不擅长处理请求,但我当前的项目需要这个。现在我的服务器是这样工作的:
from aiohttp import web
@routes.post('/')
async def my_func(request):
post = await request.json()
answer = '... do something on GPU ...'
return web.json_response(answer)`
但我想将多个请求合并为一个,并且只在 GPU 上执行一次我的功能。然后 return 对所有请求的响应(可能在循环中)。如果需要解决,我可以将 aoihttp 更改为不同的包。
例如,post 请求包含字段:{'id':1, 'data':'some data 1'}.
(1) 我要等待 5 个请求,合并数据以列出 ['some data 1', ..,'some data 5']
(2) 然后将我的函数应用于此列表(它 return 是我的答案列表 ['answer 1', ..,'answer 5']
(3) 然后我想像这样对每个请求做出响应 {'id':1, 'answers':'answer_1'}
我现在不知道如何实现步骤 (1) 和 (3)。
您可以保留 缓存 requests
(问题)和 responses
(答案)以及检查所述缓存的 background task
;当问题缓存长度达到 5 时,您 运行 GPU 函数 并填充答案缓存。
每个请求都会等到答案缓存中有它需要的数据。
server.py
import asyncio
from aiohttp import web
def gpu_func(items):
"""Send a batch of 5 questions to GPU and return the answers"""
answers = {}
for item in items:
answers[item["id"]] = "answer for data: " + item["data"]
return answers
async def gpu_loop(app):
"""Check questions cache continuously and when we have 5 questions process them and populate answers cache"""
while True:
if len(app.cache["questions"]) >= 5:
print("running GPU function")
answers = gpu_func(app.cache["questions"][:5])
print("got %d answers from GPU" % len(answers))
app.cache["answers"].update(answers)
app.cache["questions"] = app.cache["questions"][5:]
await asyncio.sleep(0.05) # sleep for 50ms
async def handle(request):
"""Main request handler: populate questions cache and wait for the answer to be available in the answers cache"""
data = await request.post()
print("got request with data ", data)
request.app.cache["questions"].append(data)
# can implement here a time limit using a counter (sleep_delay*counter = max time for request)
while True:
if data["id"] in request.app.cache["answers"]:
break
await asyncio.sleep(0.05)
answer = request.app.cache["answers"].pop(data["id"], "unknown")
return web.Response(text=answer)
# create background task (gpu_loop)
async def start_background_tasks(app):
app.gpu_loop = asyncio.create_task(gpu_loop(app))
# stop background task on shutdown
async def cleanup_background_tasks(app):
app.gpu_loop.cancel()
await app.gpu_loop
def main():
app = web.Application()
app.cache = {"questions": [], "answers": {}}
app.add_routes([web.post("/", handle)])
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)
if __name__ == "__main__":
main()
client.py
import aiohttp
import asyncio
async def make_request(session, num):
"""Make a single request using the existing session object and custom number"""
url = "http://127.0.01:8080"
data = {"id": num, "data": "question %d" % num}
response = await session.post(url, data=data)
text = await response.text()
return text
async def main():
"""Make 20 consecutive requests with a delay of 20 ms between them"""
tasks = []
session = aiohttp.ClientSession()
for i in range(20):
print("making request %d", i)
task = asyncio.ensure_future(make_request(session, i))
tasks.append(task)
await asyncio.sleep(0.02)
responses = await asyncio.gather(*tasks)
for response in responses:
print(response)
await session.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
取决于并发性(从客户端发出请求的频率)和处理时间(处理请求需要多长时间)处理一组请求)你可能需要调整 timing(睡眠)和缓存 limit(缓存中保留多少请求)。
希望这能让你入门。