当新请求来自同一会话中的同一用户时如何取消先前的请求
how to cancel a previous requests when new request come from the same user in the same session
我们正在使用 aiohttp 构建休息 api。我们的应用程序旨在让用户发送请求比接收响应更频繁(因为计算时间)。对于用户来说重要的只是最新请求的结果。是否可以停止对过时请求的计算?
谢谢
您正在构建非常不像 HTTP 的东西。一个 HTTP 请求的响应时间不应超过几毫秒,并且 HTTP 请求不应相互依赖;如果您需要执行需要很长时间的计算,请尝试通过更改 architecture/model/caching/whatever 来加快它们的速度,或者将其明确视为长 运行 作业 HTTP接口。表示"a job"是一个"physical resource",可以通过HTTP进行查询。您通过 POST 请求创建资源:
POST /tasks
Content-Type: application/json
{"some": "parameters", "go": "here"}
{"resource": "/tasks/42"}
然后可以查询任务状态:
GET /tasks/42
{"status": "pending"}
并最终得到结果:
GET /tasks/42
{"status": "done", "results": [...]}
当您 POST 一个新任务取代旧任务时,您的后端可以以任何它认为合适的方式取消旧任务;然后资源将 return 状态 "cancelled" 或类似。您的客户端在开始新任务后将不会再次查询旧资源。
即使您的客户端每秒查询一次资源,它仍然会在服务器上使用更少的资源(一个连接打开 10 秒,而 10 个连接在同一时间段内打开 200 毫秒),尤其是如果您对其应用了一些智能缓存。这也更具可扩展性,因为您可以独立于 HTTP 前端来扩展任务后端,并且 HTTP 前端可以简单地扩展到多个服务器和负载平衡器。
我会post来自@Drizzt1991 的解决方案:
Hey there, Artem. Quite a strange requirement you have there.
Understand that if 1 client is using a socket with keep-alive it's not really possible to see the next request, before answering the first one. This is how HTTP works, it expects result before sending another request.
So your case will only work if client will operate on 2 separate sockets, but again, you need to assert, that 2 sockets from the same client will be routed on the same machine. By practice this does not really work that good with failover and stuff. Basically it will be a Stateful API.
Even if you do all that, not all libraries support cancellation. Traditional relational DB's will only ignore the result, but will still process the pending query. It's OK if you do complex stuff as Graph traversal and you have a lot of steps, that can be cancelled thou.
But if you assert, that client uses a pool of sockets and they are routed to the same machine and the requests benefit from cancellation, something like this should do the trick:
import asyncio
import random
from aiohttp import web
def get_session_id(request):
# I don't know how you do session management, so left it out
return ""
async def handle(request):
session_id = get_session_id(request)
request['tr_id'] = tr_id = int(random.random() * 1000000)
running_tasks = request.app['running_tasks']
if session_id in running_tasks and not running_tasks[session_id].done():
running_tasks[session_id].cancel()
del running_tasks[session_id]
current_task = asyncio.ensure_future(_handle_impl(request))
running_tasks[session_id] = current_task
try:
resp = await current_task
except asyncio.CancelledError:
print("Cancelled request", tr_id)
resp = web.Response(text="Cancelled {}".format(tr_id))
finally:
if running_tasks[session_id] is current_task:
del running_tasks[session_id]
return resp
async def _handle_impl(request):
tr_id = request['tr_id']
print("Start request", tr_id)
await asyncio.sleep(10)
print("Finished request", tr_id)
return web.Response(text="Finished {}".format(tr_id))
app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)
app['running_tasks'] = {}
web.run_app(app, host="127.0.0.1", port=8080)
我们正在使用 aiohttp 构建休息 api。我们的应用程序旨在让用户发送请求比接收响应更频繁(因为计算时间)。对于用户来说重要的只是最新请求的结果。是否可以停止对过时请求的计算?
谢谢
您正在构建非常不像 HTTP 的东西。一个 HTTP 请求的响应时间不应超过几毫秒,并且 HTTP 请求不应相互依赖;如果您需要执行需要很长时间的计算,请尝试通过更改 architecture/model/caching/whatever 来加快它们的速度,或者将其明确视为长 运行 作业 HTTP接口。表示"a job"是一个"physical resource",可以通过HTTP进行查询。您通过 POST 请求创建资源:
POST /tasks
Content-Type: application/json
{"some": "parameters", "go": "here"}
{"resource": "/tasks/42"}
然后可以查询任务状态:
GET /tasks/42
{"status": "pending"}
并最终得到结果:
GET /tasks/42
{"status": "done", "results": [...]}
当您 POST 一个新任务取代旧任务时,您的后端可以以任何它认为合适的方式取消旧任务;然后资源将 return 状态 "cancelled" 或类似。您的客户端在开始新任务后将不会再次查询旧资源。
即使您的客户端每秒查询一次资源,它仍然会在服务器上使用更少的资源(一个连接打开 10 秒,而 10 个连接在同一时间段内打开 200 毫秒),尤其是如果您对其应用了一些智能缓存。这也更具可扩展性,因为您可以独立于 HTTP 前端来扩展任务后端,并且 HTTP 前端可以简单地扩展到多个服务器和负载平衡器。
我会post来自@Drizzt1991 的解决方案:
Hey there, Artem. Quite a strange requirement you have there. Understand that if 1 client is using a socket with keep-alive it's not really possible to see the next request, before answering the first one. This is how HTTP works, it expects result before sending another request. So your case will only work if client will operate on 2 separate sockets, but again, you need to assert, that 2 sockets from the same client will be routed on the same machine. By practice this does not really work that good with failover and stuff. Basically it will be a Stateful API. Even if you do all that, not all libraries support cancellation. Traditional relational DB's will only ignore the result, but will still process the pending query. It's OK if you do complex stuff as Graph traversal and you have a lot of steps, that can be cancelled thou.
But if you assert, that client uses a pool of sockets and they are routed to the same machine and the requests benefit from cancellation, something like this should do the trick:
import asyncio
import random
from aiohttp import web
def get_session_id(request):
# I don't know how you do session management, so left it out
return ""
async def handle(request):
session_id = get_session_id(request)
request['tr_id'] = tr_id = int(random.random() * 1000000)
running_tasks = request.app['running_tasks']
if session_id in running_tasks and not running_tasks[session_id].done():
running_tasks[session_id].cancel()
del running_tasks[session_id]
current_task = asyncio.ensure_future(_handle_impl(request))
running_tasks[session_id] = current_task
try:
resp = await current_task
except asyncio.CancelledError:
print("Cancelled request", tr_id)
resp = web.Response(text="Cancelled {}".format(tr_id))
finally:
if running_tasks[session_id] is current_task:
del running_tasks[session_id]
return resp
async def _handle_impl(request):
tr_id = request['tr_id']
print("Start request", tr_id)
await asyncio.sleep(10)
print("Finished request", tr_id)
return web.Response(text="Finished {}".format(tr_id))
app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)
app['running_tasks'] = {}
web.run_app(app, host="127.0.0.1", port=8080)