如何在同步环境中执行 Tornado 协程?
How to execute Tornado coroutine inside of synchronous environment?
我有一些 Tornado 的协程相关问题。
有一些python-model A,有执行某些功能的能力。该功能可以从模型外部设置。我不能改变模型本身,但我可以传递任何我想要的功能。我试图通过我的函数教它与 Tornado 的 ioloop 一起工作,但我做不到。
这是片段:
import functools
import pprint
from tornado import gen
from tornado import ioloop
class A:
f = None
def execute(self):
return self.f()
pass
@gen.coroutine
def genlist():
raise gen.Return(range(1, 10))
@gen.coroutine
def some_work():
a = A()
a.f = functools.partial(
ioloop.IOLoop.instance().run_sync,
lambda: genlist())
print "a.f set"
raise gen.Return(a)
@gen.coroutine
def main():
a = yield some_work()
retval = a.execute()
raise gen.Return(retval)
if __name__ == "__main__":
pprint.pprint(ioloop.IOLoop.current().run_sync(main))
所以问题是我在代码的一部分设置了功能,但在另一部分使用模型的方法执行它。
现在,Tornado 4.2.1 给了我“IOLoop 已经 运行”,但在 Tornado 3.1.1 中它有效(但我不知道具体如何).
我知道接下来的事情:
- 我可以创建新的 ioloop,但我想使用现有的 ioloop。
- 我可以用一些知道 genlist 的结果是 Future 的函数来包装 genlist,但我不知道如何阻止执行,直到 future 的结果将在同步函数中设置。
此外,我不能将 a.execute() 的结果用作未来对象,因为 a.execute() 可以从其他部分调用代码,即它应该 return 列出实例。
所以,我的问题是:是否有机会使用当前的 IOLoop 从同步模型的方法中执行异步 genlist
?
您不能在此处重新启动外部 IOLoop。您有三个选择:
- 在任何地方都使用异步接口:将
a.execute()
和堆栈顶部的所有内容更改为协程。这是基于 Tornado 的应用程序的常用模式;试图跨越同步和异步世界是困难的,最好站在一边或另一边。
- 在临时
IOLoop
上使用 run_sync()
。这就是 Tornado 的同步 tornado.httpclient.HTTPClient
所做的,这使得从另一个 IOLoop
中调用是安全的。但是,如果您这样做,外部 IOLoop 将保持阻塞状态,因此通过使 genlist
异步,您将一无所获。
运行 a.execute
在一个单独的线程上并回调主 IOLoop 的线程以获取内部函数。如果 a.execute
不能异步,这是避免在 运行.
时阻塞 IOLoop 的唯一方法
executor = concurrent.futures.ThreadPoolExecutor(8)
@gen.coroutine
def some_work():
a = A()
def adapter():
# Convert the thread-unsafe tornado.concurrent.Future
# to a thread-safe concurrent.futures.Future.
# Note that everything including chain_future must happen
# on the IOLoop thread.
future = concurrent.futures.Future()
ioloop.IOLoop.instance().add_callback(
lambda: tornado.concurrent.chain_future(
genlist(), future)
return future.result()
a.f = adapter
print "a.f set"
raise gen.Return(a)
@gen.coroutine
def main():
a = yield some_work()
retval = yield executor.submit(a.execute)
raise gen.Return(retval)
比如说,你的函数看起来像这样:
@gen.coroutine
def foo():
# does slow things
或
@concurrent.run_on_executor
def bar(i=1):
# does slow things
您可以 运行 foo()
像这样:
from tornado.ioloop import IOLoop
loop = IOLoop.current()
loop.run_sync(foo)
您可以 运行 bar(..)
,或任何采用 args 的协程,如下所示:
from functools import partial
from tornado.ioloop import IOLoop
loop = IOLoop.current()
f = partial(bar, i=100)
loop.run_sync(f)
我有一些 Tornado 的协程相关问题。
有一些python-model A,有执行某些功能的能力。该功能可以从模型外部设置。我不能改变模型本身,但我可以传递任何我想要的功能。我试图通过我的函数教它与 Tornado 的 ioloop 一起工作,但我做不到。
这是片段:
import functools
import pprint
from tornado import gen
from tornado import ioloop
class A:
f = None
def execute(self):
return self.f()
pass
@gen.coroutine
def genlist():
raise gen.Return(range(1, 10))
@gen.coroutine
def some_work():
a = A()
a.f = functools.partial(
ioloop.IOLoop.instance().run_sync,
lambda: genlist())
print "a.f set"
raise gen.Return(a)
@gen.coroutine
def main():
a = yield some_work()
retval = a.execute()
raise gen.Return(retval)
if __name__ == "__main__":
pprint.pprint(ioloop.IOLoop.current().run_sync(main))
所以问题是我在代码的一部分设置了功能,但在另一部分使用模型的方法执行它。
现在,Tornado 4.2.1 给了我“IOLoop 已经 运行”,但在 Tornado 3.1.1 中它有效(但我不知道具体如何).
我知道接下来的事情:
- 我可以创建新的 ioloop,但我想使用现有的 ioloop。
- 我可以用一些知道 genlist 的结果是 Future 的函数来包装 genlist,但我不知道如何阻止执行,直到 future 的结果将在同步函数中设置。
此外,我不能将 a.execute() 的结果用作未来对象,因为 a.execute() 可以从其他部分调用代码,即它应该 return 列出实例。
所以,我的问题是:是否有机会使用当前的 IOLoop 从同步模型的方法中执行异步 genlist
?
您不能在此处重新启动外部 IOLoop。您有三个选择:
- 在任何地方都使用异步接口:将
a.execute()
和堆栈顶部的所有内容更改为协程。这是基于 Tornado 的应用程序的常用模式;试图跨越同步和异步世界是困难的,最好站在一边或另一边。 - 在临时
IOLoop
上使用run_sync()
。这就是 Tornado 的同步tornado.httpclient.HTTPClient
所做的,这使得从另一个IOLoop
中调用是安全的。但是,如果您这样做,外部 IOLoop 将保持阻塞状态,因此通过使genlist
异步,您将一无所获。 运行
时阻塞 IOLoop 的唯一方法a.execute
在一个单独的线程上并回调主 IOLoop 的线程以获取内部函数。如果a.execute
不能异步,这是避免在 运行.executor = concurrent.futures.ThreadPoolExecutor(8) @gen.coroutine def some_work(): a = A() def adapter(): # Convert the thread-unsafe tornado.concurrent.Future # to a thread-safe concurrent.futures.Future. # Note that everything including chain_future must happen # on the IOLoop thread. future = concurrent.futures.Future() ioloop.IOLoop.instance().add_callback( lambda: tornado.concurrent.chain_future( genlist(), future) return future.result() a.f = adapter print "a.f set" raise gen.Return(a) @gen.coroutine def main(): a = yield some_work() retval = yield executor.submit(a.execute) raise gen.Return(retval)
比如说,你的函数看起来像这样:
@gen.coroutine
def foo():
# does slow things
或
@concurrent.run_on_executor
def bar(i=1):
# does slow things
您可以 运行 foo()
像这样:
from tornado.ioloop import IOLoop
loop = IOLoop.current()
loop.run_sync(foo)
您可以 运行 bar(..)
,或任何采用 args 的协程,如下所示:
from functools import partial
from tornado.ioloop import IOLoop
loop = IOLoop.current()
f = partial(bar, i=100)
loop.run_sync(f)