如何在超时的情况下取消龙卷风中挂起的异步任务?

How can I cancel a hanging asyncronous task in tornado, with a timeout?

我的设置是 python 龙卷风服务器,它使用 ThreadPoolExecutor 异步处理任务。在某些情况下,任务可能会变成无限循环。使用 with_timeout 装饰器,我设法捕获了超时异常和 return 客户端的错误结果。问题是任务还在后台运行。如何在 ThreadPoolExecutor 中停止来自 运行 的任务?还是可以取消Future? 这是重现问题的代码。 运行 使用 tornado 4 和 concurrent.futures 库的代码并转到 http://localhost:8888/test

from tornado.concurrent import run_on_executor
from tornado.gen import with_timeout
from tornado.ioloop import IOLoop
import tornado.web
from tornado import gen
from concurrent.futures import ThreadPoolExecutor
import datetime
MAX_WAIT_SECONDS = 10

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self):
        ...
        #infinite loop might be here
        ...

    @tornado.gen.coroutine
    def get(self):
        future = self.test_func()
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            #how to cancel the task here if it was timeout
            future.cancel() # <-- Does not work
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
application.listen(8888)
IOLoop.instance().start()

Future 实例本身在实际执行后无法取消,只有在处于挂起状态时才能取消。这是注释 in the docs:

cancel()

Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

因此,要中止您在后台 运行 的方法的唯一方法是实际将逻辑插入到您的潜在无限循环中,以便在您告诉它时它可以中止。对于您的示例,您可以使用 threading.Event:

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self, event):
        i = 0
        while not event.is_set():
            print i
            i = i + 1

    @tornado.gen.coroutine
    def get(self):
        event = threading.Event()
        future = self.test_func(event)
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            future.cancel() # Might not work, depending on how busy the Executor is
            event.set()
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])