如何在 Tornado HTTP 服务器上通过 HTTP 让长任务 "cancellable"?

How to let long tasks be "cancellable" via HTTP on Tornado HTTP servers?


目前,我只是直接从Tornado的进程中调用繁重的任务。我使用 jQuery 准备了某种基于 Web 的界面,让它继续 AJAX 请求,并在表单中设置参数。

如您所想,我从网络浏览器中抛出的任务不可取消。我可以取消的唯一方法是向 Python 进程发送 9 或 15 信号,而这不是用户通常可以做的。

我想通过 HTTP 请求某种 "cancel" 请求来取消当前工作任务。如何做呢?大多数处理繁重任务(例如 YouTube 中的视频编码)的网络服务在做什么?

实际上 Tornado 的 Futures 不支持取消(docs). Moreover even using with_timeout,time-outed 作业仍在 运行,只是没有等待其结果。

正如 中所述,唯一的方法是以可以取消的方式实现逻辑(使用一些标志或其他)。


  • 作业是一个简单的异步睡眠
  • / 列出职位
  • /add/TIME 添加新工作 - TIME 以秒为单位 - 指定休眠时间
  • /cancel/ID 取消作业


from tornado.ioloop import IOLoop
from tornado import gen, web
from time import time

class Job():

    def __init__(self, run_sec):
        self.run_sec = int(run_sec)
        self.start_time = None
        self.end_time = None
        self._cancelled = False

    def run(self):
        """ Some job

        The job is simple: sleep for a given number of seconds.
        It could be implemented as:
             yield gen.sleep(self.run_sec)
        but this way makes it not cancellable, so
        it is divided: run 1s sleep, run_sec times 
        self.start_time = time()
        deadline = self.start_time + self.run_sec
        while not self._cancelled:
            yield gen.sleep(1)
            if time() >= deadline:
        self.end_time = time()

    def cancel(self):
    """ Cancels job

    Returns None on success,
    raises Exception on error:
      if job is already cancelled or done
        if self._cancelled:
            raise Exception('Job is already cancelled')
        if self.end_time is not None:
            raise Exception('Job is already done')
        self._cancelled = True

    def get_state(self):
        if self._cancelled:
            if self.end_time is None:
                # job might be running still
                # and will be stopped on the next while check
                return 'CANCELING...'
                return 'CANCELLED'
        elif self.end_time is None:
            return 'RUNNING...'
        elif self.start_time is None:
            # actually this never will shown
            # as after creation, job is immediately started
            return 'NOT STARTED'
            return 'DONE'

class MainHandler(web.RequestHandler):

    def get(self, op=None, param=None):
        if op == 'add':
            # add new job
            new_job = Job(run_sec=param)
            self.write('Job added')
        elif op == 'cancel':
            # cancel job - stop running
            self.write('Job cancelled')
            # list jobs
            self.write('<pre>') # this is so ugly... ;P
            for idx, job in enumerate(self.application.jobs):
                self.write('%s\t%s\t%s\t%s\t%s\n' % (
                    idx, job.run_sec, job.start_time,
                    job.get_state(), job.end_time

class MyApplication(web.Application):

    def __init__(self):

        # to store tasks
        self.jobs = []

        super(MyApplication, self).__init__([
            (r"/", MainHandler),
            (r"/(add)/(\d*)", MainHandler),
            (r"/(cancel)/(\d*)", MainHandler),

if __name__ == "__main__":


for a in `seq 12 120`; do curl$a; done

然后取消一些...注意 - 它只需要 Tornado。

这个例子非常简单 gen.sleep 是你的繁重的任务。当然不是所有的工作都像cancel-able方式实现的那么简单。