如何在超时的情况下取消龙卷风中挂起的异步任务?
How can I cancel a hanging asyncronous task in tornado, with a timeout?
我的设置是 python 龙卷风服务器,它使用 ThreadPoolExecutor
异步处理任务。在某些情况下,任务可能会变成无限循环。使用 with_timeout
装饰器,我设法捕获了超时异常和 return 客户端的错误结果。问题是任务还在后台运行。如何在 ThreadPoolExecutor
中停止来自 运行 的任务?还是可以取消Future
?
这是重现问题的代码。 运行 使用 tornado 4 和 concurrent.futures 库的代码并转到 http://localhost:8888/test
from tornado.concurrent import run_on_executor
from tornado.gen import with_timeout
from tornado.ioloop import IOLoop
import tornado.web
from tornado import gen
from concurrent.futures import ThreadPoolExecutor
import datetime
MAX_WAIT_SECONDS = 10
class MainHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)
@run_on_executor
def test_func(self):
...
#infinite loop might be here
...
@tornado.gen.coroutine
def get(self):
future = self.test_func()
try:
result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
self.write({'status' : 0})
self.finish()
except Exception, e:
#how to cancel the task here if it was timeout
future.cancel() # <-- Does not work
self.write({'status' : 100})
self.finish()
application = tornado.web.Application([
(r"/test", MainHandler),
])
application.listen(8888)
IOLoop.instance().start()
Future
实例本身在实际执行后无法取消,只有在处于挂起状态时才能取消。这是注释 in the docs:
cancel()
Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False
,
otherwise the call will be cancelled and the method will return True
.
因此,要中止您在后台 运行 的方法的唯一方法是实际将逻辑插入到您的潜在无限循环中,以便在您告诉它时它可以中止。对于您的示例,您可以使用 threading.Event
:
class MainHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)
@run_on_executor
def test_func(self, event):
i = 0
while not event.is_set():
print i
i = i + 1
@tornado.gen.coroutine
def get(self):
event = threading.Event()
future = self.test_func(event)
try:
result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
self.write({'status' : 0})
self.finish()
except Exception, e:
future.cancel() # Might not work, depending on how busy the Executor is
event.set()
self.write({'status' : 100})
self.finish()
application = tornado.web.Application([
(r"/test", MainHandler),
])
我的设置是 python 龙卷风服务器,它使用 ThreadPoolExecutor
异步处理任务。在某些情况下,任务可能会变成无限循环。使用 with_timeout
装饰器,我设法捕获了超时异常和 return 客户端的错误结果。问题是任务还在后台运行。如何在 ThreadPoolExecutor
中停止来自 运行 的任务?还是可以取消Future
?
这是重现问题的代码。 运行 使用 tornado 4 和 concurrent.futures 库的代码并转到 http://localhost:8888/test
from tornado.concurrent import run_on_executor
from tornado.gen import with_timeout
from tornado.ioloop import IOLoop
import tornado.web
from tornado import gen
from concurrent.futures import ThreadPoolExecutor
import datetime
MAX_WAIT_SECONDS = 10
class MainHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)
@run_on_executor
def test_func(self):
...
#infinite loop might be here
...
@tornado.gen.coroutine
def get(self):
future = self.test_func()
try:
result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
self.write({'status' : 0})
self.finish()
except Exception, e:
#how to cancel the task here if it was timeout
future.cancel() # <-- Does not work
self.write({'status' : 100})
self.finish()
application = tornado.web.Application([
(r"/test", MainHandler),
])
application.listen(8888)
IOLoop.instance().start()
Future
实例本身在实际执行后无法取消,只有在处于挂起状态时才能取消。这是注释 in the docs:
cancel()
Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return
False
, otherwise the call will be cancelled and the method will returnTrue
.
因此,要中止您在后台 运行 的方法的唯一方法是实际将逻辑插入到您的潜在无限循环中,以便在您告诉它时它可以中止。对于您的示例,您可以使用 threading.Event
:
class MainHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)
@run_on_executor
def test_func(self, event):
i = 0
while not event.is_set():
print i
i = i + 1
@tornado.gen.coroutine
def get(self):
event = threading.Event()
future = self.test_func(event)
try:
result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
self.write({'status' : 0})
self.finish()
except Exception, e:
future.cancel() # Might not work, depending on how busy the Executor is
event.set()
self.write({'status' : 100})
self.finish()
application = tornado.web.Application([
(r"/test", MainHandler),
])