通过 Tornado 中 IOLoop 附带的回调回写

Write back through the callback attached to IOLoop in Tornado

有一个棘手的 post 处理程序,有时会花费很多时间(取决于输入值),有时则不会。

我想要的是每过 1 秒就回写,动态分配响应。

def post():
    def callback():
        self.write('too-late')
        self.finish()

    timeout_obj = IOLoop.current().add_timeout(
        dt.timedelta(seconds=1),
        callback,
    )

    # some asynchronous operations

    if not self.request.connection.stream.closed():
        self.write('here is your response')
        self.finish()
        IOLoop.current().remove_timeout(timeout_obj)

事实证明我在 callback.

内部无能为力

即使引发异常也会被内部上下文抑制,不会通过 post 方法传递。

还有其他实现目标的方法吗?

谢谢。

UPD 2020-05-15: 我找到了类似的

感谢@ionut-ticus,使用with_timeout()更方便。

经过一些尝试,我认为我非常接近我正在寻找的东西:

def wait(fn):
    @gen.coroutine
    @wraps(fn)
    def wrap(*args):
        try:
            result = yield gen.with_timeout(
                    dt.timedelta(seconds=20),
                    IOLoop.current().run_in_executor(None, fn, *args),
            )
            raise gen.Return(result)
        except gen.TimeoutError:
            logging.error('### TOO LONG')
            raise gen.Return('Next time, bro')
    return wrap


@wait
def blocking_func(item):
    time.sleep(30)
    # this is not a Subprocess.
    # It is a file IO and DB
    return 'we are done here'
  1. 还是不确定, wait() 装饰器应该包裹在 协程?

  2. 有时在 blocking_func() 的一系列调用中,可以 是另一个 ThreadPoolExecutor。我有一个顾虑,这行得通吗 没有使 "mine" 成为一个全球性的,并传递给 龙卷风run_in_executor()?

龙卷风:v5.1.1

我建议使用 https://github.com/aio-libs/async-timeout:

import asyncio
import async_timeout

def post():
    try:
        async with async_timeout.timeout(1):
            # some asynchronous operations

            if not self.request.connection.stream.closed():
                self.write('here is your response')
                self.finish()
                IOLoop.current().remove_timeout(timeout_obj)
    except asyncio.TimeoutError:
        self.write('too-late')
        self.finish()

tornado.gen.with_timeout 的用法示例。请记住任务需要异步,否则 IOLoop 将被阻塞并且无法处理超时:

@gen.coroutine
def async_task():
    # some async code

@gen.coroutine
def get(self):
    delta = datetime.timedelta(seconds=1)
    try:
        task = self.async_task()
        result = yield gen.with_timeout(delta, task)
        self.write("success")
    except gen.TimeoutError:
        self.write("timeout")