如何在同步代码的龙卷风中使用进程池执行器?

How to use Process Pool Executor in tornado with synchronous code?

我是 tornado 的新手,并且有一个 API 进行阻塞数据库调用。由于此阻塞调用,如果同时有多个请求,tornado 将无法满足所有请求。

我查看了一下,发现可以使用两种方法解决它:使用进程池执行器使代码异步 and/or。我在这里的假设是,拥有多个进程池执行程序就像在龙卷风上拥有多个进程来服务多个请求。我查看的关于实现 Process Pool Executor 的每个示例也使代码异步。

我暂时不能使代码异步,因为它需要更多的代码更改,所以我正在寻找使用进程池执行器的简单修复。

我目前拥有的

import tornado.ioloop
import tornado.web


def blocking_call():
    import time
    time.sleep(60)
    return "Done"


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        val = blocking_call()
        self.write(val)


if __name__ == "__main__":
    app = tornado.web.Application([(r"/", MainHandler)])
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

我试过的

import tornado.ioloop
import tornado.web
from concurrent.futures import ProcessPoolExecutor


def blocking_call():
    import time
    time.sleep(60)
    return "Done"


class MainHandler(tornado.web.RequestHandler):
    def initialize(self, executor):
        self.executor = executor

    def get(self):
        val = self.executor.submit(blocking_call)
        self.write(val)


if __name__ == "__main__":
    executor = ProcessPoolExecutor(5)

    app = tornado.web.Application(
        [(r"/", MainHandler, dict(executor=executor))])
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

我对这种方法的问题是,现在我得到的是 future object 而不是实际响应。如何让 Get 请求等待 self.executor 完成后再发回响应?

executor.submit() returns 一个 concurrent.futures.Future 不可等待。

建议使用Tornado的run_in_executor方法执行阻塞任务

async def get(self):
    loop = tornado.ioloop.IOLoop.current()
    val = await loop.run_in_executor(self.executor, blocking_call)
    self.write(val)