Long-运行 异步服务器任务
Long-running tasks with async server
我想每个人都知道在django中如何处理long-运行ning任务:使用celery和放松。但是,如果我想通过 aiohttp(或 tornado)获得 websockets 的好处怎么办?
假设我有非常 CPU 的任务,可能需要几秒钟到多 (5-10) 分钟。在 websocket 循环中处理这个任务并通知用户进度看起来是个不错的主意。没有 ajax 个请求,对短任务的响应非常快。
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
answer_to_the_ultimate_question_of_life_the_universe_and_everything =\
long_running_task(msg.data, NotificationHelper(ws))
ws.send_str(json.dumps({
'action': 'got-answer',
'data': answer_to_the_ultimate_question_of_life_the_universe_and_everything,
}))
return ws
但另一方面,据我所知,CPU-绑定任务以这种方式阻塞整个线程。如果我有 10 个工作人员和 11 个客户想要使用应用程序,则在第一个客户的任务完成之前不会为第 11 个客户提供服务。
也许,我应该 运行 在 celery 中看起来很大 的任务 和在主循环中看起来很小 的任务 ?
那么,我的问题是:是否有任何好的设计模式可以使用异步服务器来处理长时间 运行ning 任务?
谢谢!
只需 运行 运行 宁 CPU 绑定的任务 loop.run_in_executor()
并在 loop.call_soon_threadsafe()
前发送进度通知。
如果您的工作不是 CPU 但 IO 绑定(例如发送电子邮件),您可以通过 loop.create_task()
调用创建一个新任务。它看起来像生成新线程。
如果您不能使用即发即弃的方法,您需要使用像 RabbitMQ 这样的持久消息代理(有 https://github.com/benjamin-hodgson/asynqp 库以异步方式与 Rabbit 通信)。
我想每个人都知道在django中如何处理long-运行ning任务:使用celery和放松。但是,如果我想通过 aiohttp(或 tornado)获得 websockets 的好处怎么办?
假设我有非常 CPU 的任务,可能需要几秒钟到多 (5-10) 分钟。在 websocket 循环中处理这个任务并通知用户进度看起来是个不错的主意。没有 ajax 个请求,对短任务的响应非常快。
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
answer_to_the_ultimate_question_of_life_the_universe_and_everything =\
long_running_task(msg.data, NotificationHelper(ws))
ws.send_str(json.dumps({
'action': 'got-answer',
'data': answer_to_the_ultimate_question_of_life_the_universe_and_everything,
}))
return ws
但另一方面,据我所知,CPU-绑定任务以这种方式阻塞整个线程。如果我有 10 个工作人员和 11 个客户想要使用应用程序,则在第一个客户的任务完成之前不会为第 11 个客户提供服务。
也许,我应该 运行 在 celery 中看起来很大 的任务 和在主循环中看起来很小 的任务 ?
那么,我的问题是:是否有任何好的设计模式可以使用异步服务器来处理长时间 运行ning 任务?
谢谢!
只需 运行 运行 宁 CPU 绑定的任务 loop.run_in_executor()
并在 loop.call_soon_threadsafe()
前发送进度通知。
如果您的工作不是 CPU 但 IO 绑定(例如发送电子邮件),您可以通过 loop.create_task()
调用创建一个新任务。它看起来像生成新线程。
如果您不能使用即发即弃的方法,您需要使用像 RabbitMQ 这样的持久消息代理(有 https://github.com/benjamin-hodgson/asynqp 库以异步方式与 Rabbit 通信)。