如何在同步环境中执行 Tornado 协程?

How to execute Tornado coroutine inside of synchronous environment?

我有一些 Tornado 的协程相关问题。

有一些python-model A,有执行某些功能的能力。该功能可以从模型外部设置。我不能改变模型本身,但我可以传递任何我想要的功能。我试图通过我的函数教它与 Tornado 的 ioloop 一起工作,但我做不到。

这是片段:

import functools
import pprint
from tornado import gen
from tornado import ioloop

class A:
    f = None
    def execute(self):
      return self.f()
    pass

@gen.coroutine
def genlist():
    raise gen.Return(range(1, 10))

@gen.coroutine
def some_work():
    a = A()
    a.f = functools.partial(
        ioloop.IOLoop.instance().run_sync,
        lambda: genlist())
    print "a.f set"
    raise gen.Return(a)

@gen.coroutine
def main():
    a = yield some_work()
    retval = a.execute()
    raise gen.Return(retval)


if __name__ == "__main__":
    pprint.pprint(ioloop.IOLoop.current().run_sync(main))

所以问题是我在代码的一部分设置了功能,但在另一部分使用模型的方法执行它。

现在,Tornado 4.2.1 给了我“IOLoop 已经 运行”,但在 Tornado 3.1.1 中它有效(但我不知道具体如何).

我知道接下来的事情:

  1. 我可以创建新的 ioloop,但我想使用现有的 ioloop。
  2. 我可以用一些知道 genlist 的结果是 Future 的函数来包装 genlist,但我不知道如何阻止执行,直到 future 的结果将在同步函数中设置。

此外,我不能将 a.execute() 的结果用作未来对象,因为 a.execute() 可以从其他部分调用代码,即它应该 return 列出实例。

所以,我的问题是:是否有机会使用当前的 IOLoop 从同步模型的方法中执行异步 genlist

您不能在此处重新启动外部 IOLoop。您有三个选择:

  1. 在任何地方都使用异步接口:将 a.execute() 和堆栈顶部的所有内容更改为协程。这是基于 Tornado 的应用程序的常用模式;试图跨越同步和异步世界是困难的,最好站在一边或另一边。
  2. 在临时 IOLoop 上使用 run_sync()。这就是 Tornado 的同步 tornado.httpclient.HTTPClient 所做的,这使得从另一个 IOLoop 中调用是安全的。但是,如果您这样做,外部 IOLoop 将保持阻塞状态,因此通过使 genlist 异步,您将一无所获。
  3. 运行 a.execute 在一个单独的线程上并回调主 IOLoop 的线程以获取内部函数。如果 a.execute 不能异步,这是避免在 运行.

    时阻塞 IOLoop 的唯一方法
    executor = concurrent.futures.ThreadPoolExecutor(8)
    
    @gen.coroutine
    def some_work():
        a = A()
        def adapter():
            # Convert the thread-unsafe tornado.concurrent.Future
            # to a thread-safe concurrent.futures.Future.
            # Note that everything including chain_future must happen
            # on the IOLoop thread.
            future = concurrent.futures.Future()
            ioloop.IOLoop.instance().add_callback(
                lambda: tornado.concurrent.chain_future(
                    genlist(), future)
            return future.result()
        a.f = adapter
        print "a.f set"
        raise gen.Return(a)
    
    @gen.coroutine
    def main():
        a = yield some_work()
        retval = yield executor.submit(a.execute)
        raise gen.Return(retval)
    

比如说,你的函数看起来像这样:

@gen.coroutine
def foo():
    # does slow things

@concurrent.run_on_executor
def bar(i=1):
    # does slow things

您可以 运行 foo() 像这样:

from tornado.ioloop import IOLoop
loop = IOLoop.current()

loop.run_sync(foo)

您可以 运行 bar(..),或任何采用 args 的协程,如下所示:

from functools import partial
from tornado.ioloop import IOLoop

loop = IOLoop.current()

f = partial(bar, i=100)
loop.run_sync(f)